This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2a7bad8ca00 MINOR: Fix consumer log on fatal error & improve memberId
logging (#16720)
2a7bad8ca00 is described below
commit 2a7bad8ca00deddb6a70530f53fa59af99a25e66
Author: Lianet Magrans <[email protected]>
AuthorDate: Sat Aug 3 14:18:14 2024 -0400
MINOR: Fix consumer log on fatal error & improve memberId logging (#16720)
Fix log on consumer fatal error, to show member ID only if present. If no
member ID the log will clearly indicate that the member has no member ID
(instead of showing empty as it used to)
Apply same fix consistently to all other log lines that include member ID.
Reviewers: Kirk True <[email protected]>, PoAn Yang <[email protected]>,
TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../consumer/internals/MembershipManagerImpl.java | 63 +++++++++++-----------
.../internals/MembershipManagerImplTest.java | 11 ++++
2 files changed, 43 insertions(+), 31 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 70b3b492dcd..ea7ba84fda5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -363,7 +363,7 @@ public class MembershipManagerImpl implements
MembershipManager {
metricsManager.recordRebalanceStarted(time.milliseconds());
}
- log.info("Member {} with epoch {} transitioned from {} to {}.",
memberId, memberEpoch, state, nextState);
+ log.info("Member {} with epoch {} transitioned from {} to {}.",
memberIdInfoForLog(), memberEpoch, state, nextState);
this.state = nextState;
}
@@ -423,17 +423,17 @@ public class MembershipManagerImpl implements
MembershipManager {
MemberState state = state();
if (state == MemberState.LEAVING) {
log.debug("Ignoring heartbeat response received from broker.
Member {} with epoch {} is " +
- "already leaving the group.", memberId, memberEpoch);
+ "already leaving the group.", memberIdInfoForLog(),
memberEpoch);
return;
}
if (state == MemberState.UNSUBSCRIBED &&
maybeCompleteLeaveInProgress()) {
log.debug("Member {} with epoch {} received a successful response
to the heartbeat " +
- "to leave the group and completed the leave operation. ",
memberId, memberEpoch);
+ "to leave the group and completed the leave operation. ",
memberIdInfoForLog(), memberEpoch);
return;
}
if (isNotInGroup()) {
log.debug("Ignoring heartbeat response received from broker.
Member {} is in {} state" +
- " so it's not a member of the group. ", memberId, state);
+ " so it's not a member of the group. ", memberIdInfoForLog(),
state);
return;
}
@@ -472,7 +472,7 @@ public class MembershipManagerImpl implements
MembershipManager {
// operation once the request completes, regardless of the response.
if (state == MemberState.UNSUBSCRIBED &&
maybeCompleteLeaveInProgress()) {
log.warn("Member {} with epoch {} received a failed response to
the heartbeat to " +
- "leave the group and completed the leave operation. ",
memberId, memberEpoch);
+ "leave the group and completed the leave operation. ",
memberIdInfoForLog(), memberEpoch);
}
}
@@ -550,7 +550,7 @@ public class MembershipManagerImpl implements
MembershipManager {
if (state == MemberState.PREPARE_LEAVING) {
log.info("Member {} with epoch {} got fenced but it is already
preparing to leave " +
"the group, so it will stop sending heartbeat and won't
attempt to send the " +
- "leave request or rejoin.", memberId, memberEpoch);
+ "leave request or rejoin.", memberIdInfoForLog(),
memberEpoch);
// Briefly transition to LEAVING to ensure all required actions
are applied even
// though there is no need to send a leave group heartbeat (ex.
clear epoch and
// notify epoch listeners). Then transition to UNSUBSCRIBED,
ensuring that the member
@@ -564,20 +564,20 @@ public class MembershipManagerImpl implements
MembershipManager {
if (state == MemberState.LEAVING) {
log.debug("Member {} with epoch {} got fenced before sending leave
group heartbeat. " +
- "It will not send the leave request and won't attempt to
rejoin.", memberId, memberEpoch);
+ "It will not send the leave request and won't attempt to
rejoin.", memberIdInfoForLog(), memberEpoch);
transitionTo(MemberState.UNSUBSCRIBED);
maybeCompleteLeaveInProgress();
return;
}
if (state == MemberState.UNSUBSCRIBED) {
log.debug("Member {} with epoch {} got fenced but it already left
the group, so it " +
- "won't attempt to rejoin.", memberId, memberEpoch);
+ "won't attempt to rejoin.", memberIdInfoForLog(),
memberEpoch);
return;
}
transitionTo(MemberState.FENCED);
resetEpoch();
log.debug("Member {} with epoch {} transitioned to {} state. It will
release its " +
- "assignment and rejoin the group.", memberId, memberEpoch,
MemberState.FENCED);
+ "assignment and rejoin the group.", memberIdInfoForLog(),
memberEpoch, MemberState.FENCED);
// Release assignment
CompletableFuture<Void> callbackResult =
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
@@ -603,23 +603,19 @@ public class MembershipManagerImpl implements
MembershipManager {
public void transitionToFatal() {
MemberState previousState = state;
transitionTo(MemberState.FATAL);
- if (memberId.isEmpty()) {
- log.error("Member {} with epoch {} transitioned to {} state",
memberId, memberEpoch, MemberState.FATAL);
- } else {
- log.error("Non-member transitioned to {} state",
MemberState.FATAL);
- }
+ log.error("Member {} with epoch {} transitioned to fatal state",
memberIdInfoForLog(), memberEpoch);
notifyEpochChange(Optional.empty(), Optional.empty());
if (previousState == MemberState.UNSUBSCRIBED) {
log.debug("Member {} with epoch {} got fatal error from the broker
but it already " +
- "left the group, so onPartitionsLost callback won't be
triggered.", memberId, memberEpoch);
+ "left the group, so onPartitionsLost callback won't be
triggered.", memberIdInfoForLog(), memberEpoch);
return;
}
if (previousState == MemberState.LEAVING || previousState ==
MemberState.PREPARE_LEAVING) {
log.info("Member {} with epoch {} was leaving the group with state
{} when it got a " +
"fatal error from the broker. It will discard the ongoing
leave and remain in " +
- "fatal state.", memberId, memberEpoch, previousState);
+ "fatal state.", memberIdInfoForLog(), memberEpoch,
previousState);
maybeCompleteLeaveInProgress();
return;
}
@@ -635,6 +631,11 @@ public class MembershipManagerImpl implements
MembershipManager {
});
}
+ // Visible for testing
+ String memberIdInfoForLog() {
+ return (memberId == null || memberId.isEmpty()) ? "<no ID>" : memberId;
+ }
+
/**
* {@inheritDoc}
*/
@@ -707,7 +708,7 @@ public class MembershipManagerImpl implements
MembershipManager {
if (state == MemberState.PREPARE_LEAVING || state ==
MemberState.LEAVING) {
// Member already leaving. No-op and return existing leave group
future that will
// complete when the ongoing leave operation completes.
- log.debug("Leave group operation already in progress for member
{}", memberId);
+ log.debug("Leave group operation already in progress for member
{}", memberIdInfoForLog());
return leaveGroupInProgress.get();
}
@@ -719,10 +720,10 @@ public class MembershipManagerImpl implements
MembershipManager {
callbackResult.whenComplete((result, error) -> {
if (error != null) {
log.error("Member {} callback to release assignment failed. It
will proceed " +
- "to clear its assignment and send a leave group
heartbeat", memberId, error);
+ "to clear its assignment and send a leave group
heartbeat", memberIdInfoForLog(), error);
} else {
log.info("Member {} completed callback to release assignment.
It will proceed " +
- "to clear its assignment and send a leave group
heartbeat", memberId);
+ "to clear its assignment and send a leave group
heartbeat", memberIdInfoForLog());
}
// Clear the subscription, no matter if the callback execution
failed or succeeded.
subscriptions.unsubscribe();
@@ -758,7 +759,7 @@ public class MembershipManagerImpl implements
MembershipManager {
droppedPartitions.addAll(subscriptions.assignedPartitions());
log.info("Member {} is triggering callbacks to release assignment {}
and leave group",
- memberId, droppedPartitions);
+ memberIdInfoForLog(), droppedPartitions);
CompletableFuture<Void> callbackResult;
if (droppedPartitions.isEmpty()) {
@@ -790,12 +791,12 @@ public class MembershipManagerImpl implements
MembershipManager {
public void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
if (state == MemberState.FATAL) {
log.warn("Member {} with epoch {} won't send leave group request
because it is in " +
- "FATAL state", memberId, memberEpoch);
+ "FATAL state", memberIdInfoForLog(), memberEpoch);
return;
}
if (state == MemberState.UNSUBSCRIBED) {
log.warn("Member {} won't send leave group request because it is
already out of the group.",
- memberId);
+ memberIdInfoForLog());
return;
}
@@ -844,17 +845,17 @@ public class MembershipManagerImpl implements
MembershipManager {
} else {
log.debug("Member {} with epoch {} transitioned to {} after a
heartbeat was sent " +
"to ack a previous reconciliation. New assignments are
ready to " +
- "be reconciled.", memberId, memberEpoch,
MemberState.RECONCILING);
+ "be reconciled.", memberIdInfoForLog(), memberEpoch,
MemberState.RECONCILING);
transitionTo(MemberState.RECONCILING);
}
} else if (state == MemberState.LEAVING) {
if (isPollTimerExpired) {
- log.debug("Member {} with epoch {} generated the heartbeat to
leave due to expired poll timer. It will " +
+ log.debug("Member {} with epoch {} generated the heartbeat to
leave due to expired poll timer. It will " +
"remain stale (no heartbeat) until it rejoins the group on
the next consumer " +
- "poll.", memberId, memberEpoch);
+ "poll.", memberIdInfoForLog(), memberEpoch);
transitionToStale();
} else {
- log.debug("Member {} with epoch {} generated the heartbeat to
leave the group.", memberId, memberEpoch);
+ log.debug("Member {} with epoch {} generated the heartbeat to
leave the group.", memberIdInfoForLog(), memberEpoch);
transitionTo(MemberState.UNSUBSCRIBED);
}
}
@@ -868,7 +869,7 @@ public class MembershipManagerImpl implements
MembershipManager {
if (state == MemberState.LEAVING) {
log.warn("Heartbeat to leave group cannot be sent (most probably
due to coordinator " +
"not known/available). Member {} with epoch {} will
transition to {}.",
- memberId, memberEpoch, MemberState.UNSUBSCRIBED);
+ memberIdInfoForLog(), memberEpoch, MemberState.UNSUBSCRIBED);
transitionTo(MemberState.UNSUBSCRIBED);
maybeCompleteLeaveInProgress();
}
@@ -909,7 +910,7 @@ public class MembershipManagerImpl implements
MembershipManager {
isPollTimerExpired = false;
if (state == MemberState.STALE) {
log.debug("Expired poll timer has been reset so stale member {}
will rejoin the group" +
- "when it completes releasing its previous assignment.",
memberId);
+ "when it completes releasing its previous assignment.",
memberIdInfoForLog());
staleMemberAssignmentRelease.whenComplete((__, error) ->
transitionToJoining());
}
}
@@ -933,7 +934,7 @@ public class MembershipManagerImpl implements
MembershipManager {
clearAssignment();
log.debug("Member {} sent leave group heartbeat and released its
assignment. It will remain " +
"in {} state until the poll timer is reset, and it will then
rejoin the group",
- memberId, MemberState.STALE);
+ memberIdInfoForLog(), MemberState.STALE);
});
}
@@ -1001,7 +1002,7 @@ public class MembershipManagerImpl implements
MembershipManager {
"\tAdded partitions (assigned - owned): {}\n" +
"\tRevoked partitions (owned - assigned): {}\n",
resolvedAssignment.localEpoch,
- memberId,
+ memberIdInfoForLog(),
assignedTopicPartitions,
ownedPartitions,
addedPartitions,
@@ -1259,7 +1260,7 @@ public class MembershipManagerImpl implements
MembershipManager {
if (state == MemberState.FATAL) {
String errorMsg = String.format("Member %s with epoch %s received
a fatal error " +
"while waiting for a revocation commit to complete. Will abort
revocation " +
- "without triggering user callback.", memberId, memberEpoch);
+ "without triggering user callback.", memberIdInfoForLog(),
memberEpoch);
log.debug(errorMsg);
revocationResult.completeExceptionally(new
KafkaException(errorMsg));
return revocationResult;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 7c6d486f81a..f53b11308ee 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -2353,6 +2353,17 @@ public class MembershipManagerImplTest {
assertEquals(-1d, getMetricValue(metrics,
rebalanceMetricsManager.lastRebalanceSecondsAgo));
}
+ @Test
+ public void testMemberIdInfoForLogs() {
+ MembershipManagerImpl membershipManager =
createMembershipManagerJoiningGroup(null, null);
+ assertTrue(membershipManager.memberId().isEmpty());
+ assertFalse(membershipManager.memberIdInfoForLog().isEmpty());
+
+ membershipManager = createMemberInStableState(null);
+ assertFalse(membershipManager.memberId().isEmpty());
+ assertEquals(membershipManager.memberId(),
membershipManager.memberIdInfoForLog());
+ }
+
private Object getMetricValue(Metrics metrics, MetricName name) {
return metrics.metrics().get(name).metricValue();
}