This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 4e7cc335ee KAFKA-13828; Ensure reasons sent by the consumer are small 
(#12043)
4e7cc335ee is described below

commit 4e7cc335ee007c90d8fc7b3b61f42b1de81f98af
Author: David Jacot <[email protected]>
AuthorDate: Wed Apr 13 13:42:27 2022 +0200

    KAFKA-13828; Ensure reasons sent by the consumer are small (#12043)
    
    This PR reworks the reasons used in the ConsumerCoordinator to ensure that 
they remain reasonably short.
    
    Reviewers: Bruno Cadonna <[email protected]>
---
 .../consumer/internals/AbstractCoordinator.java    | 23 +++++++++++++++++-----
 .../consumer/internals/ConsumerCoordinator.java    | 16 +++++++--------
 2 files changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 5b9712f346..b2f944ad5d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -1022,15 +1022,28 @@ public abstract class AbstractCoordinator implements 
Closeable {
         resetStateAndRejoin("consumer pro-actively leaving the group", true);
     }
 
-    public synchronized void requestRejoinIfNecessary(final String reason) {
+    public synchronized void requestRejoinIfNecessary(final String shortReason,
+                                                      final String fullReason) 
{
         if (!this.rejoinNeeded) {
-            requestRejoin(reason);
+            requestRejoin(shortReason, fullReason);
         }
     }
 
-    public synchronized void requestRejoin(final String reason) {
-        log.info("Request joining group due to: {}", reason);
-        this.rejoinReason = reason;
+    public synchronized void requestRejoin(final String shortReason) {
+        requestRejoin(shortReason, shortReason);
+    }
+
+    /**
+     * Request to rejoin the group.
+     *
+     * @param shortReason This is the reason passed up to the group 
coordinator. It must be
+     *                    reasonably small.
+     * @param fullReason This is the reason logged locally.
+     */
+    public synchronized void requestRejoin(final String shortReason,
+                                           final String fullReason) {
+        log.info("Request joining group due to: {}", fullReason);
+        this.rejoinReason = shortReason;
         this.rejoinNeeded = true;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 10939b2a0e..51fa0b62ed 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -401,10 +401,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         assignedPartitions.addAll(assignment.partitions());
 
         if 
(!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
-            final String reason = String.format("received assignment %s does 
not match the current subscription %s; " +
+            final String fullReason = String.format("received assignment %s 
does not match the current subscription %s; " +
                     "it is likely that the subscription has changed since we 
joined the group, will re-join with current subscription",
                     assignment.partitions(), subscriptions.prettyString());
-            requestRejoin(reason);
+            requestRejoin("received assignment does not match the current 
subscription", fullReason);
 
             return;
         }
@@ -437,9 +437,9 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                 firstException.compareAndSet(null, 
invokePartitionsRevoked(revokedPartitions));
 
                 // If revoked any partitions, need to re-join the group 
afterwards
-                final String reason = String.format("need to revoke partitions 
%s as indicated " +
+                final String fullReason = String.format("need to revoke 
partitions %s as indicated " +
                         "by the current assignment and re-join", 
revokedPartitions);
-                requestRejoin(reason);
+                requestRejoin("need to revoke partitions and re-join", 
fullReason);
             }
         }
 
@@ -851,17 +851,17 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         // we need to rejoin if we performed the assignment and metadata has 
changed;
         // also for those owned-but-no-longer-existed partitions we should 
drop them as lost
         if (assignmentSnapshot != null && 
!assignmentSnapshot.matches(metadataSnapshot)) {
-            final String reason = String.format("cached metadata has changed 
from %s at the beginning of the rebalance to %s",
+            final String fullReason = String.format("cached metadata has 
changed from %s at the beginning of the rebalance to %s",
                 assignmentSnapshot, metadataSnapshot);
-            requestRejoinIfNecessary(reason);
+            requestRejoinIfNecessary("cached metadata has changed", 
fullReason);
             return true;
         }
 
         // we need to join if our subscription has changed since the last join
         if (joinedSubscription != null && 
!joinedSubscription.equals(subscriptions.subscription())) {
-            final String reason = String.format("subscription has changed from 
%s at the beginning of the rebalance to %s",
+            final String fullReason = String.format("subscription has changed 
from %s at the beginning of the rebalance to %s",
                 joinedSubscription, subscriptions.subscription());
-            requestRejoinIfNecessary(reason);
+            requestRejoinIfNecessary("subscription has changed", fullReason);
             return true;
         }
 

Reply via email to