This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new da055f121e7 KAFKA-14013: Limit the length of the `reason` field sent
on the wire (#12388)
da055f121e7 is described below
commit da055f121e7cab109ee1573fe925c453d567c576
Author: Eugene Tolbakov <[email protected]>
AuthorDate: Tue Jul 12 08:31:16 2022 +0100
KAFKA-14013: Limit the length of the `reason` field sent on the wire
(#12388)
KIP-800 added the `reason` field to the JoinGroupRequest and the
LeaveGroupRequest as I mean to provide more information to the group
coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we
discovered that the size of the field is limited to 32767 chars by our
serialisation mechanism. At the moment, the field either provided directly by
the user or constructed internally is directly set regardless of its length.
This patch sends only the first 255 chars of the used provided or
internally generated reason on the wire. Given the purpose of this field, that
seems acceptable and that should still provide enough information to operators
to understand the cause of a rebalance.
Reviewers: David Jacot <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 3 +-
.../consumer/internals/AbstractCoordinator.java | 10 +++----
.../kafka/common/requests/JoinGroupRequest.java | 14 ++++++++++
.../kafka/clients/admin/KafkaAdminClientTest.java | 7 +++++
.../internals/AbstractCoordinatorTest.java | 32 ++++++++++++++++++++--
5 files changed, 58 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index aad2610c94a..2b2642e3518 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -218,6 +218,7 @@ import
org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -3756,7 +3757,7 @@ public class KafkaAdminClient extends AdminClient {
public RemoveMembersFromConsumerGroupResult
removeMembersFromConsumerGroup(String groupId,
RemoveMembersFromConsumerGroupOptions options) {
String reason = options.reason() == null || options.reason().isEmpty()
?
- DEFAULT_LEAVE_GROUP_REASON : options.reason();
+ DEFAULT_LEAVE_GROUP_REASON :
JoinGroupRequest.maybeTruncateReason(options.reason());
List<MemberIdentity> members;
if (options.removeAll()) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index fa80727d5af..c9ad797ebeb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -478,11 +478,11 @@ public abstract class AbstractCoordinator implements
Closeable {
resetJoinGroupFuture();
synchronized (AbstractCoordinator.this) {
- final String shortReason = String.format("rebalance failed
due to %s",
- exception.getClass().getSimpleName());
+ final String simpleName =
exception.getClass().getSimpleName();
+ final String shortReason = String.format("rebalance failed
due to %s", simpleName);
final String fullReason = String.format("rebalance failed
due to '%s' (%s)",
exception.getMessage(),
- exception.getClass().getSimpleName());
+ simpleName);
requestRejoin(shortReason, fullReason);
}
@@ -559,7 +559,7 @@ public abstract class AbstractCoordinator implements
Closeable {
.setProtocolType(protocolType())
.setProtocols(metadata())
.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
- .setReason(this.rejoinReason)
+
.setReason(JoinGroupRequest.maybeTruncateReason(this.rejoinReason))
);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder,
this.coordinator);
@@ -1114,7 +1114,7 @@ public abstract class AbstractCoordinator implements
Closeable {
generation.memberId, coordinator, leaveReason);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
rebalanceConfig.groupId,
- Collections.singletonList(new
MemberIdentity().setMemberId(generation.memberId).setReason(leaveReason))
+ Collections.singletonList(new
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
);
future = client.send(coordinator, request).compose(new
LeaveGroupResponseHandler(generation));
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 220a59d1834..774506357bb 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -70,6 +70,20 @@ public class JoinGroupRequest extends AbstractRequest {
});
}
+ /**
+ * Ensures that the provided {@code reason} remains within a range of 255
chars.
+ * @param reason This is the reason that is sent to the broker over the
wire
+ * as a part of {@code JoinGroupRequest} or {@code
LeaveGroupRequest} messages.
+ * @return a provided reason as is or truncated reason if it exceeds the
255 chars threshold.
+ */
+ public static String maybeTruncateReason(final String reason) {
+ if (reason.length() > 255) {
+ return reason.substring(0, 255);
+ } else {
+ return reason;
+ }
+ }
+
public JoinGroupRequest(JoinGroupRequestData data, short version) {
super(ApiKeys.JOIN_GROUP, version);
this.data = data;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 523a961f964..61a2aaa00b2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4117,6 +4117,13 @@ public class KafkaAdminClientTest {
testRemoveMembersFromGroup("testing remove members reason", "testing
remove members reason");
}
+ @Test
+ public void testRemoveMembersFromGroupTruncatesReason() throws Exception {
+ final String reason = "Very
looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong
reason that is 271 characters long to make sure that length limit logic
handles the scenario nicely";
+ final String truncatedReason = reason.substring(0, 255);
+ testRemoveMembersFromGroup(reason, truncatedReason);
+ }
+
@Test
public void testRemoveMembersFromGroupDefaultReason() throws Exception {
testRemoveMembersFromGroup(null, DEFAULT_LEAVE_GROUP_REASON);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 6812af29ce5..ddbebb6dde6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -571,6 +571,15 @@ public class AbstractCoordinatorTest {
expectSyncGroup(generation, memberId);
ensureActiveGroup(generation, memberId);
assertEquals("", coordinator.rejoinReason());
+
+ // check limit length of reason field
+ final String reason = "Very
looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong
reason that is 271 characters long to make sure that length limit logic
handles the scenario nicely";
+ final String truncatedReason = reason.substring(0, 255);
+ expectJoinGroup(memberId, truncatedReason, generation, memberId);
+ expectSyncGroup(generation, memberId);
+ coordinator.requestRejoin(reason);
+ ensureActiveGroup(generation, memberId);
+ assertEquals("", coordinator.rejoinReason());
}
private void ensureActiveGroup(
@@ -1159,6 +1168,19 @@ public class AbstractCoordinatorTest {
assertTrue(leaveGroupFuture.succeeded());
}
+ @Test
+ public void testHandleNormalLeaveGroupResponseAndTruncatedLeaveReason() {
+ MemberResponse memberResponse = new MemberResponse()
+ .setMemberId(memberId)
+ .setErrorCode(Errors.NONE.code());
+ LeaveGroupResponse response =
+ leaveGroupResponse(Collections.singletonList(memberResponse));
+ String leaveReason = "Very
looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong
leaveReason that is 271 characters long to make sure that length limit logic
handles the scenario nicely";
+ RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response,
leaveReason, leaveReason.substring(0, 255));
+ assertNotNull(leaveGroupFuture);
+ assertTrue(leaveGroupFuture.succeeded());
+ }
+
@Test
public void testHandleMultipleMembersLeaveGroupResponse() {
MemberResponse memberResponse = new MemberResponse()
@@ -1193,6 +1215,12 @@ public class AbstractCoordinatorTest {
}
private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse
leaveGroupResponse) {
+ return setupLeaveGroup(leaveGroupResponse, "test maybe leave group",
"test maybe leave group");
+ }
+
+ private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse
leaveGroupResponse,
+ String leaveReason,
+ String expectedLeaveReason) {
setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE,
Optional.empty());
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
@@ -1204,11 +1232,11 @@ public class AbstractCoordinatorTest {
}
LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest)
body).data();
return
leaveGroupRequest.members().get(0).memberId().equals(memberId) &&
- leaveGroupRequest.members().get(0).reason().equals("test
maybe leave group");
+
leaveGroupRequest.members().get(0).reason().equals(expectedLeaveReason);
}, leaveGroupResponse);
coordinator.ensureActiveGroup();
- return coordinator.maybeLeaveGroup("test maybe leave group");
+ return coordinator.maybeLeaveGroup(leaveReason);
}
@Test