This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a3f06d88140 KAFKA-14013: Limit the length of the `reason` field sent 
on the wire (#12388)
a3f06d88140 is described below

commit a3f06d88140783a73e0301c4c1a521b2a4fc97b9
Author: Eugene Tolbakov <[email protected]>
AuthorDate: Tue Jul 12 08:31:16 2022 +0100

    KAFKA-14013: Limit the length of the `reason` field sent on the wire 
(#12388)
    
    KIP-800 added the `reason` field to the JoinGroupRequest and the 
LeaveGroupRequest as I mean to provide more information to the group 
coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we 
discovered that the size of the field is limited to 32767 chars by our 
serialisation mechanism. At the moment, the field either provided directly by 
the user or constructed internally is directly set regardless of its length.
    
    This patch sends only the first 255 chars of the used provided or 
internally generated reason on the wire. Given the purpose of this field, that 
seems acceptable and that should still provide enough information to operators 
to understand the cause of a rebalance.
    
    Reviewers: David Jacot <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  3 +-
 .../consumer/internals/AbstractCoordinator.java    | 10 +++----
 .../kafka/common/requests/JoinGroupRequest.java    | 14 ++++++++++
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  7 +++++
 .../internals/AbstractCoordinatorTest.java         | 32 ++++++++++++++++++++--
 5 files changed, 58 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index aad2610c94a..2b2642e3518 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -218,6 +218,7 @@ import 
org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -3756,7 +3757,7 @@ public class KafkaAdminClient extends AdminClient {
     public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(String groupId,
                                                                                
RemoveMembersFromConsumerGroupOptions options) {
         String reason = options.reason() == null || options.reason().isEmpty() 
?
-            DEFAULT_LEAVE_GROUP_REASON : options.reason();
+            DEFAULT_LEAVE_GROUP_REASON : 
JoinGroupRequest.maybeTruncateReason(options.reason());
 
         List<MemberIdentity> members;
         if (options.removeAll()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index fa80727d5af..c9ad797ebeb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -478,11 +478,11 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
                 resetJoinGroupFuture();
                 synchronized (AbstractCoordinator.this) {
-                    final String shortReason = String.format("rebalance failed 
due to %s",
-                        exception.getClass().getSimpleName());
+                    final String simpleName = 
exception.getClass().getSimpleName();
+                    final String shortReason = String.format("rebalance failed 
due to %s", simpleName);
                     final String fullReason = String.format("rebalance failed 
due to '%s' (%s)",
                         exception.getMessage(),
-                        exception.getClass().getSimpleName());
+                        simpleName);
                     requestRejoin(shortReason, fullReason);
                 }
 
@@ -559,7 +559,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                         .setProtocolType(protocolType())
                         .setProtocols(metadata())
                         
.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
-                        .setReason(this.rejoinReason)
+                        
.setReason(JoinGroupRequest.maybeTruncateReason(this.rejoinReason))
         );
 
         log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, 
this.coordinator);
@@ -1114,7 +1114,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 generation.memberId, coordinator, leaveReason);
             LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
                 rebalanceConfig.groupId,
-                Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(leaveReason))
+                Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
             );
 
             future = client.send(coordinator, request).compose(new 
LeaveGroupResponseHandler(generation));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 220a59d1834..774506357bb 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -70,6 +70,20 @@ public class JoinGroupRequest extends AbstractRequest {
         });
     }
 
+    /**
+     * Ensures that the provided {@code reason} remains within a range of 255 
chars.
+     * @param reason This is the reason that is sent to the broker over the 
wire
+     *               as a part of {@code JoinGroupRequest} or {@code 
LeaveGroupRequest} messages.
+     * @return a provided reason as is or truncated reason if it exceeds the 
255 chars threshold.
+     */
+    public static String maybeTruncateReason(final String reason) {
+        if (reason.length() > 255) {
+            return reason.substring(0, 255);
+        } else {
+            return reason;
+        }
+    }
+
     public JoinGroupRequest(JoinGroupRequestData data, short version) {
         super(ApiKeys.JOIN_GROUP, version);
         this.data = data;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 523a961f964..61a2aaa00b2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4117,6 +4117,13 @@ public class KafkaAdminClientTest {
         testRemoveMembersFromGroup("testing remove members reason", "testing 
remove members reason");
     }
 
+    @Test
+    public void testRemoveMembersFromGroupTruncatesReason() throws Exception {
+        final String reason = "Very 
looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong
 reason that is 271 characters long to make sure that length limit logic 
handles the scenario nicely";
+        final String truncatedReason = reason.substring(0, 255);
+        testRemoveMembersFromGroup(reason, truncatedReason);
+    }
+
     @Test
     public void testRemoveMembersFromGroupDefaultReason() throws Exception {
         testRemoveMembersFromGroup(null, DEFAULT_LEAVE_GROUP_REASON);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 6812af29ce5..ddbebb6dde6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -571,6 +571,15 @@ public class AbstractCoordinatorTest {
         expectSyncGroup(generation, memberId);
         ensureActiveGroup(generation, memberId);
         assertEquals("", coordinator.rejoinReason());
+
+        // check limit length of reason field
+        final String reason = "Very 
looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong
 reason that is 271 characters long to make sure that length limit logic 
handles the scenario nicely";
+        final String truncatedReason = reason.substring(0, 255);
+        expectJoinGroup(memberId, truncatedReason, generation, memberId);
+        expectSyncGroup(generation, memberId);
+        coordinator.requestRejoin(reason);
+        ensureActiveGroup(generation, memberId);
+        assertEquals("", coordinator.rejoinReason());
     }
 
     private void ensureActiveGroup(
@@ -1159,6 +1168,19 @@ public class AbstractCoordinatorTest {
         assertTrue(leaveGroupFuture.succeeded());
     }
 
+    @Test
+    public void testHandleNormalLeaveGroupResponseAndTruncatedLeaveReason() {
+        MemberResponse memberResponse = new MemberResponse()
+                .setMemberId(memberId)
+                .setErrorCode(Errors.NONE.code());
+        LeaveGroupResponse response =
+                leaveGroupResponse(Collections.singletonList(memberResponse));
+        String leaveReason = "Very 
looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong
 leaveReason that is 271 characters long to make sure that length limit logic 
handles the scenario nicely";
+        RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response, 
leaveReason, leaveReason.substring(0, 255));
+        assertNotNull(leaveGroupFuture);
+        assertTrue(leaveGroupFuture.succeeded());
+    }
+
     @Test
     public void testHandleMultipleMembersLeaveGroupResponse() {
         MemberResponse memberResponse = new MemberResponse()
@@ -1193,6 +1215,12 @@ public class AbstractCoordinatorTest {
     }
 
     private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse 
leaveGroupResponse) {
+        return setupLeaveGroup(leaveGroupResponse, "test maybe leave group", 
"test maybe leave group");
+    }
+
+    private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse 
leaveGroupResponse,
+                                                String leaveReason,
+                                                String expectedLeaveReason) {
         setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, 
Optional.empty());
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
@@ -1204,11 +1232,11 @@ public class AbstractCoordinatorTest {
             }
             LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) 
body).data();
             return 
leaveGroupRequest.members().get(0).memberId().equals(memberId) &&
-                   leaveGroupRequest.members().get(0).reason().equals("test 
maybe leave group");
+                   
leaveGroupRequest.members().get(0).reason().equals(expectedLeaveReason);
         }, leaveGroupResponse);
 
         coordinator.ensureActiveGroup();
-        return coordinator.maybeLeaveGroup("test maybe leave group");
+        return coordinator.maybeLeaveGroup(leaveReason);
     }
 
     @Test

Reply via email to