This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bd462df MINOR: standardize rebalance related logging for easy
discovery & debugging (#9295)
bd462df is described below
commit bd462df20321ff5b75a7e3eae70634268582d90b
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Sep 25 20:29:17 2020 -0700
MINOR: standardize rebalance related logging for easy discovery & debugging
(#9295)
Some minor logging adjustments to standardize the grammar of rebalance
related messages and make it easy to query the logs for quick debugging results
Guozhang Wang <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 52 ++++++++++++----------
.../consumer/internals/ConsumerCoordinator.java | 2 +-
.../internals/StreamsPartitionAssignor.java | 4 +-
3 files changed, 32 insertions(+), 26 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 080d9c4..b021c91 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -444,7 +444,7 @@ public abstract class AbstractCoordinator implements
Closeable {
stateSnapshot = this.state;
}
- if (generationSnapshot != Generation.NO_GENERATION &&
stateSnapshot == MemberState.STABLE) {
+ if (!generationSnapshot.equals(Generation.NO_GENERATION) &&
stateSnapshot == MemberState.STABLE) {
// Duplicate the buffer in case `onJoinComplete` does not
complete and needs to be retried.
ByteBuffer memberAssignment = future.value().duplicate();
@@ -563,7 +563,7 @@ public abstract class AbstractCoordinator implements
Closeable {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
if
(isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
- log.error("JoinGroup failed due to inconsistent Protocol
Type, received {} but expected {}",
+ log.error("JoinGroup failed: Inconsistent Protocol Type,
received {} but expected {}",
joinResponse.data().protocolType(), protocolType());
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
@@ -598,11 +598,12 @@ public abstract class AbstractCoordinator implements
Closeable {
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
- log.debug("Attempt to join group rejected since coordinator {}
is loading the group.", coordinator());
+ log.info("JoinGroup failed: Coordinator {} is loading the
group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
- log.debug("Attempt to join group failed due to unknown member
id with {}.", sentGeneration);
+ log.info("JoinGroup failed: {} Need to re-join the group. Sent
generation was {}",
+ error.message(), sentGeneration);
// only need to reset the member id if generation has not been
changed,
// then retry immediately
if (generationUnchanged())
@@ -613,13 +614,14 @@ public abstract class AbstractCoordinator implements
Closeable {
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
- log.debug("Attempt to join group failed due to obsolete
coordinator information: {}", error.message());
+ log.info("JoinGroup failed: {} Marking coordinator unknown.
Sent generation was {}",
+ error.message(), sentGeneration);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
// for join-group request, even if the generation has changed
we would not expect the instance id
// gets fenced, and hence we always treat this as a fatal error
- log.error("Attempt to join group with generation {} failed
because the group instance id {} has been fenced by another instance",
- rebalanceConfig.groupInstanceId, sentGeneration);
+ log.error("JoinGroup failed: The group instance id {} has been
fenced by another instance. " +
+ "Sent generation was {}",
rebalanceConfig.groupInstanceId, sentGeneration);
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
@@ -627,7 +629,7 @@ public abstract class AbstractCoordinator implements
Closeable {
|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
// log the error and re-throw the exception
- log.error("Attempt to join group failed due to fatal error:
{}", error.message());
+ log.error("JoinGroup failed due to fatal error: {}",
error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException("Consumer
group " + rebalanceConfig.groupId +
" already has the configured maximum number of
members."));
@@ -637,21 +639,22 @@ public abstract class AbstractCoordinator implements
Closeable {
future.raise(error);
}
} else if (error == Errors.UNSUPPORTED_VERSION) {
- log.error("Attempt to join group failed due to unsupported
version error. Please unset field group.instance.id and retry" +
- " to see if the problem resolves");
+ log.error("JoinGroup failed due to unsupported version error.
Please unset field group.instance.id " +
+ "and retry to see if the problem resolves");
future.raise(error);
} else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join
the group. Update member id
// and send another join group request in next cycle.
String memberId = joinResponse.data().memberId();
- log.debug("Attempt to join group returned {} error. Will set
the member id as {} and then rejoin", error, memberId);
+ log.debug("JoinGroup failed due to non-fatal error: {} Will
set the member id as {} and then rejoin. " +
+ "Sent generation was {}", error, memberId,
sentGeneration);
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new
Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
}
future.raise(error);
} else {
// unexpected error, throw the exception
- log.error("Attempt to join group failed due to unexpected
error: {}", error.message());
+ log.error("JoinGroup failed due to unexpected error: {}",
error.message());
future.raise(new KafkaException("Unexpected error in join
group response: " + error.message()));
}
}
@@ -732,7 +735,7 @@ public abstract class AbstractCoordinator implements
Closeable {
sensors.syncSensor.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
- if (generation != Generation.NO_GENERATION && state ==
MemberState.COMPLETING_REBALANCE) {
+ if (!generation.equals(Generation.NO_GENERATION) &&
state == MemberState.COMPLETING_REBALANCE) {
// check protocol name only if the generation is
not reset
final String protocolName =
syncResponse.data.protocolName();
final boolean protocolNameInconsistent =
protocolName != null &&
@@ -755,8 +758,8 @@ public abstract class AbstractCoordinator implements
Closeable {
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
}
} else {
- log.info("Generation data was cleared by heartbeat
thread as {} and state is now {} before " +
- "received SyncGroup response, marking this
rebalance as failed and retry",
+ log.info("Generation data was cleared by heartbeat
thread to {} and state is now {} before " +
+ "receiving SyncGroup response, marking this
rebalance as failed and retry",
generation, state);
// use ILLEGAL_GENERATION error code to let it
retry immediately
future.raise(Errors.ILLEGAL_GENERATION);
@@ -769,24 +772,27 @@ public abstract class AbstractCoordinator implements
Closeable {
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
- log.debug("SyncGroup failed because the group began
another rebalance");
+ log.info("SyncGroup failed: The group began another
rebalance. Need to re-join the group. " +
+ "Sent generation was {}", sentGeneration);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
// for sync-group request, even if the generation has
changed we would not expect the instance id
// gets fenced, and hence we always treat this as a fatal
error
- log.error("SyncGroup with {} failed because the group
instance id {} has been fenced by another instance",
- sentGeneration, rebalanceConfig.groupInstanceId);
+ log.error("SyncGroup failed: The group instance id {} has
been fenced by another instance. " +
+ "Sent generation was {}",
rebalanceConfig.groupInstanceId, sentGeneration);
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION) {
- log.info("SyncGroup with {} failed: {}, would request
re-join", sentGeneration, error.message());
+ log.info("SyncGroup failed: {} Need to re-join the group.
Sent generation was {}",
+ error.message(), sentGeneration);
if (generationUnchanged())
resetGenerationOnResponseError(ApiKeys.SYNC_GROUP,
error);
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
- log.debug("SyncGroup failed: {}, marking coordinator
unknown", error.message());
+ log.info("SyncGroup failed: {} Marking coordinator
unknown. Sent generation was {}",
+ error.message(), sentGeneration);
markCoordinatorUnknown();
future.raise(error);
} else {
@@ -1484,18 +1490,18 @@ public abstract class AbstractCoordinator implements
Closeable {
* @return true if the two ids are matching.
*/
final boolean hasMatchingGenerationId(int generationId) {
- return generation != Generation.NO_GENERATION &&
generation.generationId == generationId;
+ return !generation.equals(Generation.NO_GENERATION) &&
generation.generationId == generationId;
}
final boolean hasUnknownGeneration() {
- return generation == Generation.NO_GENERATION;
+ return generation.equals(Generation.NO_GENERATION);
}
/**
* @return true if the current generation's member ID is valid, false
otherwise
*/
final boolean hasValidMemberId() {
- return generation != Generation.NO_GENERATION &&
generation.hasMemberId();
+ return !hasUnknownGeneration() && generation.hasMemberId();
}
final synchronized void setNewGeneration(final Generation generation) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9ab2901..80be7a9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -408,7 +408,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
firstException.compareAndSet(null,
invokePartitionsRevoked(revokedPartitions));
// If revoked any partitions, need to re-join the group
afterwards
- log.debug("Need to revoke partitions {} and re-join the
group", revokedPartitions);
+ log.info("Need to revoke partitions {} and re-join the group",
revokedPartitions);
requestRejoin();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 37c0d88..d7df48f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -1058,14 +1058,14 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
if (!activeTasksRemovedPendingRevokation.isEmpty()) {
// TODO: once KAFKA-10078 is resolved we can leave it to the
client to trigger this rebalance
- log.info("Requesting {} followup rebalance be scheduled
immediately due to tasks changing ownership.", consumer);
+ log.info("Requesting followup rebalance be scheduled
immediately by {} due to tasks changing ownership.", consumer);
info.setNextRebalanceTime(0L);
followupRebalanceRequiredForRevokedTasks = true;
// Don't bother to schedule a probing rebalance if an
immediate one is already scheduled
shouldEncodeProbingRebalance = false;
} else if (shouldEncodeProbingRebalance) {
final long nextRebalanceTimeMs = time.milliseconds() +
probingRebalanceIntervalMs();
- log.info("Requesting {} followup rebalance be scheduled for {}
ms to probe for caught-up replica tasks.",
+ log.info("Requesting followup rebalance be scheduled by {} for
{} ms to probe for caught-up replica tasks.",
consumer, nextRebalanceTimeMs);
info.setNextRebalanceTime(nextRebalanceTimeMs);
shouldEncodeProbingRebalance = false;