This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new bd462df MINOR: standardize rebalance related logging for easy discovery & debugging (#9295) bd462df is described below commit bd462df20321ff5b75a7e3eae70634268582d90b Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Fri Sep 25 20:29:17 2020 -0700 MINOR: standardize rebalance related logging for easy discovery & debugging (#9295) Some minor logging adjustments to standardize the grammar of rebalance related messages and make it easy to query the logs for quick debugging results Guozhang Wang <wangg...@gmail.com> --- .../consumer/internals/AbstractCoordinator.java | 52 ++++++++++++---------- .../consumer/internals/ConsumerCoordinator.java | 2 +- .../internals/StreamsPartitionAssignor.java | 4 +- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 080d9c4..b021c91 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -444,7 +444,7 @@ public abstract class AbstractCoordinator implements Closeable { stateSnapshot = this.state; } - if (generationSnapshot != Generation.NO_GENERATION && stateSnapshot == MemberState.STABLE) { + if (!generationSnapshot.equals(Generation.NO_GENERATION) && stateSnapshot == MemberState.STABLE) { // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried. ByteBuffer memberAssignment = future.value().duplicate(); @@ -563,7 +563,7 @@ public abstract class AbstractCoordinator implements Closeable { Errors error = joinResponse.error(); if (error == Errors.NONE) { if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) { - log.error("JoinGroup failed due to inconsistent Protocol Type, received {} but expected {}", + log.error("JoinGroup failed: Inconsistent Protocol Type, received {} but expected {}", joinResponse.data().protocolType(), protocolType()); future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL); } else { @@ -598,11 +598,12 @@ public abstract class AbstractCoordinator implements Closeable { } } } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { - log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator()); + log.info("JoinGroup failed: Coordinator {} is loading the group.", coordinator()); // backoff and retry future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID) { - log.debug("Attempt to join group failed due to unknown member id with {}.", sentGeneration); + log.info("JoinGroup failed: {} Need to re-join the group. Sent generation was {}", + error.message(), sentGeneration); // only need to reset the member id if generation has not been changed, // then retry immediately if (generationUnchanged()) @@ -613,13 +614,14 @@ public abstract class AbstractCoordinator implements Closeable { || error == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry with backoff markCoordinatorUnknown(); - log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message()); + log.info("JoinGroup failed: {} Marking coordinator unknown. Sent generation was {}", + error.message(), sentGeneration); future.raise(error); } else if (error == Errors.FENCED_INSTANCE_ID) { // for join-group request, even if the generation has changed we would not expect the instance id // gets fenced, and hence we always treat this as a fatal error - log.error("Attempt to join group with generation {} failed because the group instance id {} has been fenced by another instance", - rebalanceConfig.groupInstanceId, sentGeneration); + log.error("JoinGroup failed: The group instance id {} has been fenced by another instance. " + + "Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration); future.raise(error); } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT @@ -627,7 +629,7 @@ public abstract class AbstractCoordinator implements Closeable { || error == Errors.GROUP_AUTHORIZATION_FAILED || error == Errors.GROUP_MAX_SIZE_REACHED) { // log the error and re-throw the exception - log.error("Attempt to join group failed due to fatal error: {}", error.message()); + log.error("JoinGroup failed due to fatal error: {}", error.message()); if (error == Errors.GROUP_MAX_SIZE_REACHED) { future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId + " already has the configured maximum number of members.")); @@ -637,21 +639,22 @@ public abstract class AbstractCoordinator implements Closeable { future.raise(error); } } else if (error == Errors.UNSUPPORTED_VERSION) { - log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" + - " to see if the problem resolves"); + log.error("JoinGroup failed due to unsupported version error. Please unset field group.instance.id " + + "and retry to see if the problem resolves"); future.raise(error); } else if (error == Errors.MEMBER_ID_REQUIRED) { // Broker requires a concrete member id to be allowed to join the group. Update member id // and send another join group request in next cycle. String memberId = joinResponse.data().memberId(); - log.debug("Attempt to join group returned {} error. Will set the member id as {} and then rejoin", error, memberId); + log.debug("JoinGroup failed due to non-fatal error: {} Will set the member id as {} and then rejoin. " + + "Sent generation was {}", error, memberId, sentGeneration); synchronized (AbstractCoordinator.this) { AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null); } future.raise(error); } else { // unexpected error, throw the exception - log.error("Attempt to join group failed due to unexpected error: {}", error.message()); + log.error("JoinGroup failed due to unexpected error: {}", error.message()); future.raise(new KafkaException("Unexpected error in join group response: " + error.message())); } } @@ -732,7 +735,7 @@ public abstract class AbstractCoordinator implements Closeable { sensors.syncSensor.record(response.requestLatencyMs()); synchronized (AbstractCoordinator.this) { - if (generation != Generation.NO_GENERATION && state == MemberState.COMPLETING_REBALANCE) { + if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) { // check protocol name only if the generation is not reset final String protocolName = syncResponse.data.protocolName(); final boolean protocolNameInconsistent = protocolName != null && @@ -755,8 +758,8 @@ public abstract class AbstractCoordinator implements Closeable { future.complete(ByteBuffer.wrap(syncResponse.data.assignment())); } } else { - log.info("Generation data was cleared by heartbeat thread as {} and state is now {} before " + - "received SyncGroup response, marking this rebalance as failed and retry", + log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " + + "receiving SyncGroup response, marking this rebalance as failed and retry", generation, state); // use ILLEGAL_GENERATION error code to let it retry immediately future.raise(Errors.ILLEGAL_GENERATION); @@ -769,24 +772,27 @@ public abstract class AbstractCoordinator implements Closeable { if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { - log.debug("SyncGroup failed because the group began another rebalance"); + log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " + + "Sent generation was {}", sentGeneration); future.raise(error); } else if (error == Errors.FENCED_INSTANCE_ID) { // for sync-group request, even if the generation has changed we would not expect the instance id // gets fenced, and hence we always treat this as a fatal error - log.error("SyncGroup with {} failed because the group instance id {} has been fenced by another instance", - sentGeneration, rebalanceConfig.groupInstanceId); + log.error("SyncGroup failed: The group instance id {} has been fenced by another instance. " + + "Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration); future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) { - log.info("SyncGroup with {} failed: {}, would request re-join", sentGeneration, error.message()); + log.info("SyncGroup failed: {} Need to re-join the group. Sent generation was {}", + error.message(), sentGeneration); if (generationUnchanged()) resetGenerationOnResponseError(ApiKeys.SYNC_GROUP, error); future.raise(error); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { - log.debug("SyncGroup failed: {}, marking coordinator unknown", error.message()); + log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}", + error.message(), sentGeneration); markCoordinatorUnknown(); future.raise(error); } else { @@ -1484,18 +1490,18 @@ public abstract class AbstractCoordinator implements Closeable { * @return true if the two ids are matching. */ final boolean hasMatchingGenerationId(int generationId) { - return generation != Generation.NO_GENERATION && generation.generationId == generationId; + return !generation.equals(Generation.NO_GENERATION) && generation.generationId == generationId; } final boolean hasUnknownGeneration() { - return generation == Generation.NO_GENERATION; + return generation.equals(Generation.NO_GENERATION); } /** * @return true if the current generation's member ID is valid, false otherwise */ final boolean hasValidMemberId() { - return generation != Generation.NO_GENERATION && generation.hasMemberId(); + return !hasUnknownGeneration() && generation.hasMemberId(); } final synchronized void setNewGeneration(final Generation generation) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 9ab2901..80be7a9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -408,7 +408,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions)); // If revoked any partitions, need to re-join the group afterwards - log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions); + log.info("Need to revoke partitions {} and re-join the group", revokedPartitions); requestRejoin(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 37c0d88..d7df48f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1058,14 +1058,14 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf if (!activeTasksRemovedPendingRevokation.isEmpty()) { // TODO: once KAFKA-10078 is resolved we can leave it to the client to trigger this rebalance - log.info("Requesting {} followup rebalance be scheduled immediately due to tasks changing ownership.", consumer); + log.info("Requesting followup rebalance be scheduled immediately by {} due to tasks changing ownership.", consumer); info.setNextRebalanceTime(0L); followupRebalanceRequiredForRevokedTasks = true; // Don't bother to schedule a probing rebalance if an immediate one is already scheduled shouldEncodeProbingRebalance = false; } else if (shouldEncodeProbingRebalance) { final long nextRebalanceTimeMs = time.milliseconds() + probingRebalanceIntervalMs(); - log.info("Requesting {} followup rebalance be scheduled for {} ms to probe for caught-up replica tasks.", + log.info("Requesting followup rebalance be scheduled by {} for {} ms to probe for caught-up replica tasks.", consumer, nextRebalanceTimeMs); info.setNextRebalanceTime(nextRebalanceTimeMs); shouldEncodeProbingRebalance = false;