This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 4e7cc335ee KAFKA-13828; Ensure reasons sent by the consumer are small
(#12043)
4e7cc335ee is described below
commit 4e7cc335ee007c90d8fc7b3b61f42b1de81f98af
Author: David Jacot <[email protected]>
AuthorDate: Wed Apr 13 13:42:27 2022 +0200
KAFKA-13828; Ensure reasons sent by the consumer are small (#12043)
This PR reworks the reasons used in the ConsumerCoordinator to ensure that
they remain reasonably short.
Reviewers: Bruno Cadonna <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 23 +++++++++++++++++-----
.../consumer/internals/ConsumerCoordinator.java | 16 +++++++--------
2 files changed, 26 insertions(+), 13 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 5b9712f346..b2f944ad5d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -1022,15 +1022,28 @@ public abstract class AbstractCoordinator implements
Closeable {
resetStateAndRejoin("consumer pro-actively leaving the group", true);
}
- public synchronized void requestRejoinIfNecessary(final String reason) {
+ public synchronized void requestRejoinIfNecessary(final String shortReason,
+ final String fullReason)
{
if (!this.rejoinNeeded) {
- requestRejoin(reason);
+ requestRejoin(shortReason, fullReason);
}
}
- public synchronized void requestRejoin(final String reason) {
- log.info("Request joining group due to: {}", reason);
- this.rejoinReason = reason;
+ public synchronized void requestRejoin(final String shortReason) {
+ requestRejoin(shortReason, shortReason);
+ }
+
+ /**
+ * Request to rejoin the group.
+ *
+ * @param shortReason This is the reason passed up to the group
coordinator. It must be
+ * reasonably small.
+ * @param fullReason This is the reason logged locally.
+ */
+ public synchronized void requestRejoin(final String shortReason,
+ final String fullReason) {
+ log.info("Request joining group due to: {}", fullReason);
+ this.rejoinReason = shortReason;
this.rejoinNeeded = true;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 10939b2a0e..51fa0b62ed 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -401,10 +401,10 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
assignedPartitions.addAll(assignment.partitions());
if
(!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
- final String reason = String.format("received assignment %s does
not match the current subscription %s; " +
+ final String fullReason = String.format("received assignment %s
does not match the current subscription %s; " +
"it is likely that the subscription has changed since we
joined the group, will re-join with current subscription",
assignment.partitions(), subscriptions.prettyString());
- requestRejoin(reason);
+ requestRejoin("received assignment does not match the current
subscription", fullReason);
return;
}
@@ -437,9 +437,9 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
firstException.compareAndSet(null,
invokePartitionsRevoked(revokedPartitions));
// If revoked any partitions, need to re-join the group
afterwards
- final String reason = String.format("need to revoke partitions
%s as indicated " +
+ final String fullReason = String.format("need to revoke
partitions %s as indicated " +
"by the current assignment and re-join",
revokedPartitions);
- requestRejoin(reason);
+ requestRejoin("need to revoke partitions and re-join",
fullReason);
}
}
@@ -851,17 +851,17 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
// we need to rejoin if we performed the assignment and metadata has
changed;
// also for those owned-but-no-longer-existed partitions we should
drop them as lost
if (assignmentSnapshot != null &&
!assignmentSnapshot.matches(metadataSnapshot)) {
- final String reason = String.format("cached metadata has changed
from %s at the beginning of the rebalance to %s",
+ final String fullReason = String.format("cached metadata has
changed from %s at the beginning of the rebalance to %s",
assignmentSnapshot, metadataSnapshot);
- requestRejoinIfNecessary(reason);
+ requestRejoinIfNecessary("cached metadata has changed",
fullReason);
return true;
}
// we need to join if our subscription has changed since the last join
if (joinedSubscription != null &&
!joinedSubscription.equals(subscriptions.subscription())) {
- final String reason = String.format("subscription has changed from
%s at the beginning of the rebalance to %s",
+ final String fullReason = String.format("subscription has changed
from %s at the beginning of the rebalance to %s",
joinedSubscription, subscriptions.subscription());
- requestRejoinIfNecessary(reason);
+ requestRejoinIfNecessary("subscription has changed", fullReason);
return true;
}