This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new ce86a54  KAFKA-13783; Remove reason prefixing in JoinGroupRequest and 
LeaveGroupRequest (#11971)
ce86a54 is described below

commit ce86a54bdc925ea5a62a7b0ba61fbe99300754ad
Author: David Jacot <[email protected]>
AuthorDate: Thu Mar 31 14:31:31 2022 +0200

    KAFKA-13783; Remove reason prefixing in JoinGroupRequest and 
LeaveGroupRequest (#11971)
    
    KIP-800 introduced a mechanism to pass a reason in the join group request 
and in the leave group request. A default reason is used unless one is provided 
by the user. In this case, the custom reason is prefixed by the default one.
    
    When we tried to used this in Kafka Streams, we noted a significant 
degradation of the performances, see 
https://github.com/apache/kafka/pull/11873. It is not clear wether the 
prefixing is the root cause of the issue or not. To be on the safe side, I 
think that we should remove the prefixing. It does not bring much anyway as we 
are still able to distinguish a custom reason from the default one on the 
broker side.
    
    This patch removes prefixing the user provided reasons. So if a the user 
provides a reason, the reason is used directly. If the reason is empty or null, 
the default reason is used.
    
    Reviewers: Luke Chen <[email protected]>, <[email protected]>, Hao Li 
<[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 26 +++++----
 .../apache/kafka/clients/admin/MemberToRemove.java |  4 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |  4 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 31 +++++++----
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 61 ++++++++++++++++++++++
 5 files changed, 101 insertions(+), 25 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 03322fd..cf99556 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -306,9 +306,9 @@ public class KafkaAdminClient extends AdminClient {
     private static final long INVALID_SHUTDOWN_TIME = -1;
 
     /**
-     * The base reason for a LeaveGroupRequest
+     * The default reason for a LeaveGroupRequest.
      */
-    static final String LEAVE_GROUP_REASON = "member was removed by an admin";
+    static final String DEFAULT_LEAVE_GROUP_REASON = "member was removed by an 
admin";
 
     /**
      * Thread name prefix for admin client network thread
@@ -3713,7 +3713,7 @@ public class KafkaAdminClient extends AdminClient {
         }
     }
 
-    private List<MemberIdentity> getMembersFromGroup(String groupId) {
+    private List<MemberIdentity> getMembersFromGroup(String groupId, String 
reason) {
         Collection<MemberDescription> members;
         try {
             members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
@@ -3723,11 +3723,15 @@ public class KafkaAdminClient extends AdminClient {
 
         List<MemberIdentity> membersToRemove = new ArrayList<>();
         for (final MemberDescription member : members) {
+            MemberIdentity memberIdentity = new 
MemberIdentity().setReason(reason);
+
             if (member.groupInstanceId().isPresent()) {
-                membersToRemove.add(new 
MemberIdentity().setGroupInstanceId(member.groupInstanceId().get()));
+                
memberIdentity.setGroupInstanceId(member.groupInstanceId().get());
             } else {
-                membersToRemove.add(new 
MemberIdentity().setMemberId(member.consumerId()));
+                memberIdentity.setMemberId(member.consumerId());
             }
+
+            membersToRemove.add(memberIdentity);
         }
         return membersToRemove;
     }
@@ -3735,15 +3739,17 @@ public class KafkaAdminClient extends AdminClient {
     @Override
     public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(String groupId,
                                                                                
RemoveMembersFromConsumerGroupOptions options) {
+        String reason = options.reason() == null || options.reason().isEmpty() 
?
+            DEFAULT_LEAVE_GROUP_REASON : options.reason();
+
         List<MemberIdentity> members;
         if (options.removeAll()) {
-            members = getMembersFromGroup(groupId);
+            members = getMembersFromGroup(groupId, reason);
         } else {
-            members = 
options.members().stream().map(MemberToRemove::toMemberIdentity).collect(Collectors.toList());
+            members = options.members().stream()
+                .map(m -> m.toMemberIdentity().setReason(reason))
+                .collect(Collectors.toList());
         }
-        
-        String reason = options.reason() == null ? LEAVE_GROUP_REASON : 
LEAVE_GROUP_REASON + ": " + options.reason();
-        members.forEach(member -> member.setReason(reason));
 
         SimpleAdminApiFuture<CoordinatorKey, Map<MemberIdentity, Errors>> 
future =
                 RemoveMembersFromConsumerGroupHandler.newFuture(groupId);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java
index 4c7b16b..5ca5463 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java
@@ -48,8 +48,8 @@ public class MemberToRemove {
 
     MemberIdentity toMemberIdentity() {
         return new MemberIdentity()
-                   .setGroupInstanceId(groupInstanceId)
-                   .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID);
+            .setGroupInstanceId(groupInstanceId)
+            .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID);
     }
 
     public String groupInstanceId() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 0fd4ea9..d0ef1a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -563,7 +563,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private static final long NO_CURRENT_THREAD = -1L;
     private static final String JMX_PREFIX = "kafka.consumer";
     static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000;
-    private static final String DEFAULT_REASON = "rebalance enforced by user";
+    static final String DEFAULT_REASON = "rebalance enforced by user";
 
     // Visible for testing
     final Metrics metrics;
@@ -2322,7 +2322,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
             if (coordinator == null) {
                 throw new IllegalStateException("Tried to force a rebalance 
but consumer does not have a group.");
             }
-            coordinator.requestRejoin(reason == null ? DEFAULT_REASON : 
DEFAULT_REASON + ": " + reason);
+            coordinator.requestRejoin(reason == null || reason.isEmpty() ? 
DEFAULT_REASON : reason);
         } finally {
             release();
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3d7bb94..c337831 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -236,7 +236,7 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
-import static 
org.apache.kafka.clients.admin.KafkaAdminClient.LEAVE_GROUP_REASON;
+import static 
org.apache.kafka.clients.admin.KafkaAdminClient.DEFAULT_LEAVE_GROUP_REASON;
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
 import static 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
@@ -3947,35 +3947,44 @@ public class KafkaAdminClientTest {
                 LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) 
body).data();
 
                 return leaveGroupRequest.members().stream().allMatch(
-                        member -> member.reason().equals(expectedReason)
+                    member -> member.reason().equals(expectedReason)
                 );
             }, new LeaveGroupResponse(new 
LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers(
-                    Arrays.asList(
-                            new 
MemberResponse().setGroupInstanceId("instance-1"),
-                            new 
MemberResponse().setGroupInstanceId("instance-2")
-                    ))
+                Arrays.asList(
+                    new MemberResponse().setGroupInstanceId("instance-1"),
+                    new MemberResponse().setGroupInstanceId("instance-2")
+                ))
             ));
 
-            Collection<MemberToRemove> membersToRemove = Arrays.asList(new 
MemberToRemove("instance-1"), new MemberToRemove("instance-2"));
+            MemberToRemove memberToRemove1 = new MemberToRemove("instance-1");
+            MemberToRemove memberToRemove2 = new MemberToRemove("instance-2");
 
-            RemoveMembersFromConsumerGroupOptions options = new 
RemoveMembersFromConsumerGroupOptions(membersToRemove);
+            RemoveMembersFromConsumerGroupOptions options = new 
RemoveMembersFromConsumerGroupOptions(Arrays.asList(
+                memberToRemove1,
+                memberToRemove2
+            ));
             options.reason(reason);
 
             final RemoveMembersFromConsumerGroupResult result = 
env.adminClient().removeMembersFromConsumerGroup(
-                    GROUP_ID, options);
+                GROUP_ID,
+                options
+            );
 
             assertNull(result.all().get());
+            assertNull(result.memberResult(memberToRemove1).get());
+            assertNull(result.memberResult(memberToRemove2).get());
         }
     }
 
     @Test
     public void testRemoveMembersFromGroupReason() throws Exception {
-        testRemoveMembersFromGroup("testing remove members reason", 
LEAVE_GROUP_REASON + ": testing remove members reason");
+        testRemoveMembersFromGroup("testing remove members reason", "testing 
remove members reason");
     }
 
     @Test
     public void testRemoveMembersFromGroupDefaultReason() throws Exception {
-        testRemoveMembersFromGroup(null, LEAVE_GROUP_REASON);
+        testRemoveMembersFromGroup(null, DEFAULT_LEAVE_GROUP_REASON);
+        testRemoveMembersFromGroup("", DEFAULT_LEAVE_GROUP_REASON);
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 27c108b..ae54efc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -138,6 +138,7 @@ import java.util.stream.Stream;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.consumer.KafkaConsumer.DEFAULT_REASON;
 import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -2820,6 +2821,66 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    public void testEnforceRebalanceReason() {
+        Time time = new MockTime(1L);
+
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1)));
+        Node node = metadata.fetch().nodes().get(0);
+
+        KafkaConsumer<String, String> consumer = newConsumer(
+            time,
+            client,
+            subscription,
+            metadata,
+            assignor,
+            true,
+            groupInstanceId
+        );
+        consumer.subscribe(Collections.singletonList(topic));
+
+        // Lookup coordinator.
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
+        consumer.poll(Duration.ZERO);
+
+        // Initial join sends an empty reason.
+        prepareJoinGroupAndVerifyReason(client, node, "");
+        consumer.poll(Duration.ZERO);
+
+        // A null reason should be replaced by the default reason.
+        consumer.enforceRebalance(null);
+        prepareJoinGroupAndVerifyReason(client, node, DEFAULT_REASON);
+        consumer.poll(Duration.ZERO);
+
+        // An empty reason should be replaced by the default reason.
+        consumer.enforceRebalance("");
+        prepareJoinGroupAndVerifyReason(client, node, DEFAULT_REASON);
+        consumer.poll(Duration.ZERO);
+
+        // A non-null and non-empty reason is sent as-is.
+        String customReason = "user provided reason";
+        consumer.enforceRebalance(customReason);
+        prepareJoinGroupAndVerifyReason(client, node, customReason);
+        consumer.poll(Duration.ZERO);
+    }
+
+    private void prepareJoinGroupAndVerifyReason(
+        MockClient client,
+        Node node,
+        String expectedReason
+    ) {
+        client.prepareResponseFrom(
+            body -> {
+                JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
+                return expectedReason.equals(joinGroupRequest.data().reason());
+            },
+            joinGroupFollowerResponse(assignor, 1, memberId, leaderId, 
Errors.NONE),
+            node
+        );
+    }
+
+    @Test
     public void configurableObjectsShouldSeeGeneratedClientId() {
         Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

Reply via email to