This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 08a4cda [MINOR] Improve consumer logging on LeaveGroup (#5420)
08a4cda is described below
commit 08a4cda34e7535ead76d2a65826ff9e09b8bdd4f
Author: Dhruvil Shah <[email protected]>
AuthorDate: Sat Jul 28 07:48:01 2018 -0700
[MINOR] Improve consumer logging on LeaveGroup (#5420)
* Improve consumer logging on LeaveGroup
* Add GroupCoordinator logging, and address review comments
Reviewers: Guozhang Wang <[email protected]>
---
.../clients/consumer/CommitFailedException.java | 2 +-
.../kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../consumer/internals/AbstractCoordinator.java | 8 ++++++-
.../kafka/coordinator/group/GroupCoordinator.scala | 26 +++++++++++-----------
4 files changed, 22 insertions(+), 16 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
index c6006b7..cae5b2a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
@@ -33,7 +33,7 @@ public class CommitFailedException extends KafkaException {
"rebalanced and assigned the partitions to another member.
This means that the time " +
"between subsequent calls to poll() was longer than the
configured max.poll.interval.ms, " +
"which typically implies that the poll loop is spending too
much time message processing. " +
- "You can address this either by increasing the session timeout
or by reducing the maximum " +
+ "You can address this either by increasing
max.poll.interval.ms or by reducing the maximum " +
"size of batches returned in poll() with max.poll.records.");
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index fb37fee..651bd79 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1018,7 +1018,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
public void unsubscribe() {
acquireAndEnsureOpen();
try {
- log.debug("Unsubscribed all topics or patterns and assigned
partitions");
+ log.info("Unsubscribed all topics or patterns and assigned
partitions");
this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 53834fb..f9e1c18 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -803,7 +803,7 @@ public abstract class AbstractCoordinator implements
Closeable {
if (!coordinatorUnknown() && state != MemberState.UNJOINED &&
generation != Generation.NO_GENERATION) {
// this is a minimal effort attempt to leave the group. we do not
// attempt any resending if the request fails or times out.
- log.debug("Sending LeaveGroup request to coordinator {}",
coordinator);
+ log.info("Sending LeaveGroup request to coordinator {}",
coordinator);
LeaveGroupRequest.Builder request =
new LeaveGroupRequest.Builder(groupId,
generation.memberId);
client.send(coordinator, request)
@@ -1032,6 +1032,12 @@ public abstract class AbstractCoordinator implements
Closeable {
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that
the foreground thread has stalled
// in between calls to poll(), so we explicitly
leave the group.
+ log.warn("This member will leave the group because
consumer poll timeout has expired. This " +
+ "means the time between subsequent calls
to poll() was longer than the configured " +
+ "max.poll.interval.ms, which typically
implies that the poll loop is spending too " +
+ "much time processing messages. You can
address this either by increasing " +
+ "max.poll.interval.ms or by reducing the
maximum size of batches returned in poll() " +
+ "with max.poll.records.");
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff
in case the heartbeat failed or the
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 2cedacd..6ca443f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -291,7 +291,7 @@ class GroupCoordinator(val brokerId: Int,
if (group.is(CompletingRebalance) && generationId ==
group.generationId) {
if (error != Errors.NONE) {
resetAndPropagateAssignmentError(group, error)
- maybePrepareRebalance(group)
+ maybePrepareRebalance(group, s"error when storing group
assignment during SyncGroup (member: $memberId)")
} else {
setAndPropagateAssignment(group, assignment)
group.transitionTo(Stable)
@@ -333,7 +333,7 @@ class GroupCoordinator(val brokerId: Int,
val member = group.get(memberId)
removeHeartbeatForLeavingMember(group, member)
debug(s"Member ${member.memberId} in group ${group.groupId} has
left, removing it from the group")
- removeMemberAndUpdateGroup(group, member)
+ removeMemberAndUpdateGroup(group, member, s"removing member
$memberId on LeaveGroup")
responseCallback(Errors.NONE)
}
}
@@ -700,7 +700,7 @@ class GroupCoordinator(val brokerId: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
- callback: JoinCallback) = {
+ callback: JoinCallback): MemberMetadata = {
val memberId = clientId + "-" + group.generateMemberIdSuffix
val member = new MemberMetadata(memberId, group.groupId, clientId,
clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols)
@@ -710,7 +710,7 @@ class GroupCoordinator(val brokerId: Int,
group.newMemberAdded = true
group.add(member)
- maybePrepareRebalance(group)
+ maybePrepareRebalance(group, s"Adding new member $memberId")
member
}
@@ -720,17 +720,17 @@ class GroupCoordinator(val brokerId: Int,
callback: JoinCallback) {
member.supportedProtocols = protocols
member.awaitingJoinCallback = callback
- maybePrepareRebalance(group)
+ maybePrepareRebalance(group, s"Updating metadata for member
${member.memberId}")
}
- private def maybePrepareRebalance(group: GroupMetadata) {
+ private def maybePrepareRebalance(group: GroupMetadata, reason: String) {
group.inLock {
if (group.canRebalance)
- prepareRebalance(group)
+ prepareRebalance(group, reason)
}
}
- private def prepareRebalance(group: GroupMetadata) {
+ private def prepareRebalance(group: GroupMetadata, reason: String) {
// if any members are awaiting sync, cancel their request and have them
rejoin
if (group.is(CompletingRebalance))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
@@ -747,18 +747,18 @@ class GroupCoordinator(val brokerId: Int,
group.transitionTo(PreparingRebalance)
- info(s"Preparing to rebalance group ${group.groupId} with old generation
${group.generationId} " +
- s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
+ info(s"Preparing to rebalance group ${group.groupId} in state
${group.currentState} with old generation " +
+ s"${group.generationId}
(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason:
$reason)")
val groupKey = GroupKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
- private def removeMemberAndUpdateGroup(group: GroupMetadata, member:
MemberMetadata) {
+ private def removeMemberAndUpdateGroup(group: GroupMetadata, member:
MemberMetadata, reason: String) {
group.remove(member.memberId)
group.currentState match {
case Dead | Empty =>
- case Stable | CompletingRebalance => maybePrepareRebalance(group)
+ case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
case PreparingRebalance =>
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
@@ -837,7 +837,7 @@ class GroupCoordinator(val brokerId: Int,
group.inLock {
if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
info(s"Member ${member.memberId} in group ${group.groupId} has failed,
removing it from the group")
- removeMemberAndUpdateGroup(group, member)
+ removeMemberAndUpdateGroup(group, member, s"removing member
${member.memberId} on heartbeat expiration")
}
}
}