This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a7e9619734f MINOR: Additional logging for ISR and replica set changes
(#21221)
a7e9619734f is described below
commit a7e9619734f4d9c097bd867928497f2056d99884
Author: Alyssa Huang <[email protected]>
AuthorDate: Mon Jan 19 10:30:27 2026 -0800
MINOR: Additional logging for ISR and replica set changes (#21221)
Adds more detailed logging around partition info (e.g. partitionEpoch,
leaderEpoch, priorISR, priorReplicaSet) for ISR and replica set changes.
This should help with debugging issues in the future.
Reviewers: José Armando García Sancio <[email protected]>
---
.../controller/ReplicationControlManager.java | 117 ++++++++++++++-------
1 file changed, 78 insertions(+), 39 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index a1e93b3f10f..63c3bb78765 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -518,9 +518,15 @@ public class ReplicationControlManager {
}
if (record.removingReplicas() != null || record.addingReplicas() !=
null) {
- log.info("Replayed partition assignment change {} for topic {}",
record, topicInfo.name);
+ log.info("Replayed partition assignment change {} for topic {}. " +
+ "(isr: {} -> {}, replicaSet: {} -> {}, partitionEpoch: {},
leaderEpoch: {})",
+ record, topicInfo.name, prevPartitionInfo.isr,
newPartitionInfo.isr, prevPartitionInfo.replicas,
+ newPartitionInfo.replicas, newPartitionInfo.partitionEpoch,
newPartitionInfo.leaderEpoch);
} else if (log.isDebugEnabled()) {
- log.debug("Replayed partition change {} for topic {}", record,
topicInfo.name);
+ log.debug("Replayed partition change {} for topic {}. " +
+ "(isr: {} -> {}, replicaSet: {} -> {}, partitionEpoch: {},
leaderEpoch: {})",
+ record, topicInfo.name, prevPartitionInfo.isr,
newPartitionInfo.isr, prevPartitionInfo.replicas,
+ newPartitionInfo.replicas, newPartitionInfo.partitionEpoch,
newPartitionInfo.leaderEpoch);
}
}
@@ -1141,10 +1147,11 @@ public class ReplicationControlManager {
if (record.isPresent()) {
records.add(record.get());
PartitionChangeRecord change = (PartitionChangeRecord)
record.get().message();
- partition = partition.merge(change);
+ PartitionRegistration originalPartition = partition;
+ partition = originalPartition.merge(change);
if (log.isDebugEnabled()) {
- log.debug("Node {} has altered ISR for {}-{} to {}.",
- request.brokerId(), topic.name, partitionId,
change.isr());
+ log.debug("Node {} has altered ISR for {}-{}. {}",
+ request.brokerId(), topic.name, partitionId,
logPartitionChangeInfo(originalPartition, partition));
}
if (change.leader() != request.brokerId() &&
change.leader() != NO_LEADER_CHANGE) {
@@ -1161,17 +1168,17 @@ public class ReplicationControlManager {
// metadata record. We usually only do one or the
other.
Errors error = NEW_LEADER_ELECTED;
log.info("AlterPartition request from node {} for
{}-{} completed " +
- "the ongoing partition reassignment and triggered
a " +
- "leadership change. Returning {}.",
- request.brokerId(), topic.name, partitionId,
error);
+ "the ongoing partition reassignment and triggered
a leadership change {}. Returning {}.",
+ request.brokerId(), topic.name, partitionId,
+ logPartitionChangeInfo(originalPartition,
partition), error);
responseTopicData.partitions().add(new
AlterPartitionResponseData.PartitionData().
setPartitionIndex(partitionId).
setErrorCode(error.code()));
continue;
} else if (isReassignmentInProgress(partition)) {
log.info("AlterPartition request from node {} for
{}-{} completed " +
- "the ongoing partition reassignment.",
request.brokerId(),
- topic.name, partitionId);
+ "the ongoing partition reassignment. {}",
+ request.brokerId(), topic.name, partitionId,
logPartitionChangeInfo(originalPartition, partition));
}
}
@@ -1194,6 +1201,14 @@ public class ReplicationControlManager {
return ControllerResult.of(records, response);
}
+ private static String logPartitionChangeInfo(PartitionRegistration
oldRegistration, PartitionRegistration newRegistration) {
+ return String.format("isr: %s -> %s, replicaSet: %s -> %s,
partitionEpoch: %d -> %d, leaderEpoch: %d -> %d",
+ Arrays.toString(oldRegistration.isr),
Arrays.toString(newRegistration.isr),
+ Arrays.toString(oldRegistration.replicas),
Arrays.toString(newRegistration.replicas),
+ oldRegistration.partitionEpoch, newRegistration.partitionEpoch,
+ oldRegistration.leaderEpoch, newRegistration.leaderEpoch);
+ }
+
/**
* Validates that a batch of topics will create less than {@value
MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch
* has led to out-of-memory exceptions. We use this validation to fail
earlier to avoid allocating the memory.
@@ -1252,35 +1267,38 @@ public class ReplicationControlManager {
// this case to give the leader an opportunity to find the new
controller.
if (partitionData.leaderEpoch() > partition.leaderEpoch) {
log.debug("Rejecting AlterPartition request from node {} for {}-{}
because " +
- "the current leader epoch is {}, which is greater than the
local value {}.",
- brokerId, topic.name, partitionId, partition.leaderEpoch,
partitionData.leaderEpoch());
+ "the current leader epoch is {}, which is greater than the
local value {}. {}",
+ brokerId, topic.name, partitionId, partition.leaderEpoch,
partitionData.leaderEpoch(),
+ logPartitionChangeInfo(partition,
partitionData.newIsrWithEpochs()));
return NOT_CONTROLLER;
}
if (partitionData.partitionEpoch() > partition.partitionEpoch) {
log.debug("Rejecting AlterPartition request from node {} for {}-{}
because " +
- "the current partition epoch is {}, which is greater than
the local value {}.",
- brokerId, topic.name, partitionId, partition.partitionEpoch,
partitionData.partitionEpoch());
+ "the current partition epoch is {}, which is greater than
the local value {}. {}",
+ brokerId, topic.name, partitionId, partition.partitionEpoch,
partitionData.partitionEpoch(),
+ logPartitionChangeInfo(partition,
partitionData.newIsrWithEpochs()));
return NOT_CONTROLLER;
}
if (partitionData.leaderEpoch() < partition.leaderEpoch) {
log.debug("Rejecting AlterPartition request from node {} for {}-{}
because " +
- "the current leader epoch is {}, not {}.", brokerId,
topic.name,
- partitionId, partition.leaderEpoch,
partitionData.leaderEpoch());
+ "the current leader epoch is {}, not {}. {}", brokerId,
topic.name,
+ partitionId, partition.leaderEpoch,
partitionData.leaderEpoch(),
+ logPartitionChangeInfo(partition,
partitionData.newIsrWithEpochs()));
return FENCED_LEADER_EPOCH;
}
if (brokerId != partition.leader) {
log.info("Rejecting AlterPartition request from node {} for {}-{}
because " +
- "the current leader is {}.", brokerId, topic.name,
- partitionId, partition.leader);
+ "the current leader is {}. {}", brokerId, topic.name,
+ partitionId, partition.leader,
logPartitionChangeInfo(partition, partitionData.newIsrWithEpochs()));
return INVALID_REQUEST;
}
if (partitionData.partitionEpoch() < partition.partitionEpoch) {
log.info("Rejecting AlterPartition request from node {} for {}-{}
because " +
- "the current partition epoch is {}, not {}.", brokerId,
+ "the current partition epoch is {}, not {}. {}", brokerId,
topic.name, partitionId, partition.partitionEpoch,
- partitionData.partitionEpoch());
+ partitionData.partitionEpoch(),
logPartitionChangeInfo(partition, partitionData.newIsrWithEpochs()));
return INVALID_UPDATE_VERSION;
}
@@ -1290,34 +1308,36 @@ public class ReplicationControlManager {
if (!Replicas.validateIsr(partition.replicas, newIsr)) {
log.error("Rejecting AlterPartition request from node {} for {}-{}
because " +
- "it specified an invalid ISR {}.", brokerId,
- topic.name, partitionId, partitionData.newIsrWithEpochs());
+ "it specified an invalid ISR. {}", brokerId,
+ topic.name, partitionId, logPartitionChangeInfo(partition,
partitionData.newIsrWithEpochs()));
return INVALID_REQUEST;
}
if (!Replicas.contains(newIsr, partition.leader)) {
// The ISR must always include the current leader.
log.error("Rejecting AlterPartition request from node {} for {}-{}
because " +
- "it specified an invalid ISR {} that doesn't include
itself.",
- brokerId, topic.name, partitionId,
partitionData.newIsrWithEpochs());
+ "it specified an invalid ISR that doesn't include itself.
{}",
+ brokerId, topic.name, partitionId,
+ logPartitionChangeInfo(partition,
partitionData.newIsrWithEpochs()));
return INVALID_REQUEST;
}
LeaderRecoveryState leaderRecoveryState =
LeaderRecoveryState.of(partitionData.leaderRecoveryState());
if (leaderRecoveryState == LeaderRecoveryState.RECOVERING &&
newIsr.length > 1) {
log.info("Rejecting AlterPartition request from node {} for {}-{}
because " +
- "the ISR {} had more than one replica while the leader was
still " +
- "recovering from an unclean leader election {}.",
- brokerId, topic.name, partitionId,
partitionData.newIsrWithEpochs(),
- leaderRecoveryState);
+ "the ISR had more than one replica while the leader was
still " +
+ "recovering from an unclean leader election {}. {}",
+ brokerId, topic.name, partitionId, leaderRecoveryState,
+ logPartitionChangeInfo(partition,
partitionData.newIsrWithEpochs()));
return INVALID_REQUEST;
}
if (partition.leaderRecoveryState == LeaderRecoveryState.RECOVERED &&
leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
log.info("Rejecting AlterPartition request from node {} for {}-{}
because " +
- "the leader recovery state cannot change from RECOVERED to
RECOVERING.",
- brokerId, topic.name, partitionId);
+ "the leader recovery state cannot change from RECOVERED to
RECOVERING. {}",
+ brokerId, topic.name, partitionId,
+ logPartitionChangeInfo(partition,
partitionData.newIsrWithEpochs()));
return INVALID_REQUEST;
}
@@ -1325,14 +1345,25 @@ public class ReplicationControlManager {
List<IneligibleReplica> ineligibleReplicas =
ineligibleReplicasForIsr(partitionData.newIsrWithEpochs());
if (!ineligibleReplicas.isEmpty()) {
log.info("Rejecting AlterPartition request from node {} for {}-{}
because " +
- "it specified ineligible replicas {} in the new ISR {}.",
- brokerId, topic.name, partitionId, ineligibleReplicas,
partitionData.newIsrWithEpochs());
+ "it specified ineligible replicas {} in the new ISR. {}",
+ brokerId, topic.name, partitionId, ineligibleReplicas,
+ logPartitionChangeInfo(partition,
partitionData.newIsrWithEpochs()));
return INELIGIBLE_REPLICA;
}
return Errors.NONE;
}
+ private static String logPartitionChangeInfo(
+ PartitionRegistration partition,
+ List<BrokerState> requestedIsr
+ ) {
+ return String.format("Proposed ISR was %s and current ISR is %s. " +
+ "Current replica set is %s. Current partitionEpoch is %d.
Current leaderEpoch is %d.",
+ requestedIsr, Arrays.toString(partition.isr),
Arrays.toString(partition.replicas),
+ partition.partitionEpoch, partition.leaderEpoch);
+ }
+
private List<IneligibleReplica> ineligibleReplicasForIsr(List<BrokerState>
brokerStates) {
List<IneligibleReplica> ineligibleReplicas = new ArrayList<>(0);
for (BrokerState brokerState : brokerStates) {
@@ -1788,25 +1819,33 @@ public class ReplicationControlManager {
Iterator<TopicIdPartition> iterator =
brokersToIsrs.partitionsWithNoLeader();
while (iterator.hasNext() && records.size() < maxElections) {
TopicIdPartition topicIdPartition = iterator.next();
+ int partitionId = topicIdPartition.partitionId();
TopicControlInfo topic = topics.get(topicIdPartition.topicId());
+
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
- ApiError result = electLeader(topic.name,
topicIdPartition.partitionId(),
+ ApiError result = electLeader(topic.name, partitionId,
ElectionType.UNCLEAN, records);
if (result.error().equals(Errors.NONE)) {
- log.info("Triggering unclean leader election for offline
partition {}-{}.",
- topic.name, topicIdPartition.partitionId());
+ log.info("Triggering unclean leader election for offline
partition {}-{}. {}",
+ topic.name, partitionId,
logPartitionInfo(topic.parts.get(partitionId)));
} else {
- log.warn("Cannot trigger unclean leader election for
offline partition {}-{}: {}",
- topic.name, topicIdPartition.partitionId(),
result.error());
+ log.warn("Cannot trigger unclean leader election for
offline partition {}-{}: {}. {}",
+ topic.name, partitionId, result.error(),
logPartitionInfo(topic.parts.get(partitionId)));
}
} else if (log.isDebugEnabled()) {
log.debug("Cannot trigger unclean leader election for offline
partition {}-{} " +
- "because unclean leader election is disabled
for this topic.",
- topic.name, topicIdPartition.partitionId());
+ "because unclean leader election is disabled
for this topic. {}",
+ topic.name, partitionId,
logPartitionInfo(topic.parts.get(partitionId)));
}
}
}
+ private static String logPartitionInfo(PartitionRegistration partition) {
+ return String.format("(isr: %s, replicaSet: %s, partitionEpoch: %d,
leaderEpoch: %d)",
+ Arrays.toString(partition.isr),
Arrays.toString(partition.replicas), partition.partitionEpoch,
+ partition.leaderEpoch);
+ }
+
ControllerResult<List<CreatePartitionsTopicResult>> createPartitions(
ControllerRequestContext context,
List<CreatePartitionsTopic> topics