This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a7e9619734f MINOR: Additional logging for ISR and replica set changes 
(#21221)
a7e9619734f is described below

commit a7e9619734f4d9c097bd867928497f2056d99884
Author: Alyssa Huang <[email protected]>
AuthorDate: Mon Jan 19 10:30:27 2026 -0800

    MINOR: Additional logging for ISR and replica set changes (#21221)
    
    Adds more detailed logging around partition info (e.g. partitionEpoch,
    leaderEpoch, priorISR, priorReplicaSet) for ISR and replica set changes.
    This should help with debugging issues in the future.
    
    Reviewers: José Armando García Sancio <[email protected]>
---
 .../controller/ReplicationControlManager.java      | 117 ++++++++++++++-------
 1 file changed, 78 insertions(+), 39 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index a1e93b3f10f..63c3bb78765 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -518,9 +518,15 @@ public class ReplicationControlManager {
         }
 
         if (record.removingReplicas() != null || record.addingReplicas() != 
null) {
-            log.info("Replayed partition assignment change {} for topic {}", 
record, topicInfo.name);
+            log.info("Replayed partition assignment change {} for topic {}. " +
+                    "(isr: {} -> {}, replicaSet: {} -> {}, partitionEpoch: {}, 
leaderEpoch: {})",
+                record, topicInfo.name, prevPartitionInfo.isr, 
newPartitionInfo.isr, prevPartitionInfo.replicas,
+                newPartitionInfo.replicas, newPartitionInfo.partitionEpoch, 
newPartitionInfo.leaderEpoch);
         } else if (log.isDebugEnabled()) {
-            log.debug("Replayed partition change {} for topic {}", record, 
topicInfo.name);
+            log.debug("Replayed partition change {} for topic {}. " +
+                    "(isr: {} -> {}, replicaSet: {} -> {}, partitionEpoch: {}, 
leaderEpoch: {})",
+                record, topicInfo.name, prevPartitionInfo.isr, 
newPartitionInfo.isr, prevPartitionInfo.replicas,
+                newPartitionInfo.replicas, newPartitionInfo.partitionEpoch, 
newPartitionInfo.leaderEpoch);
         }
     }
 
@@ -1141,10 +1147,11 @@ public class ReplicationControlManager {
                 if (record.isPresent()) {
                     records.add(record.get());
                     PartitionChangeRecord change = (PartitionChangeRecord) 
record.get().message();
-                    partition = partition.merge(change);
+                    PartitionRegistration originalPartition = partition;
+                    partition = originalPartition.merge(change);
                     if (log.isDebugEnabled()) {
-                        log.debug("Node {} has altered ISR for {}-{} to {}.",
-                            request.brokerId(), topic.name, partitionId, 
change.isr());
+                        log.debug("Node {} has altered ISR for {}-{}. {}",
+                            request.brokerId(), topic.name, partitionId, 
logPartitionChangeInfo(originalPartition, partition));
                     }
                     if (change.leader() != request.brokerId() &&
                             change.leader() != NO_LEADER_CHANGE) {
@@ -1161,17 +1168,17 @@ public class ReplicationControlManager {
                         // metadata record. We usually only do one or the 
other.
                         Errors error = NEW_LEADER_ELECTED;
                         log.info("AlterPartition request from node {} for 
{}-{} completed " +
-                            "the ongoing partition reassignment and triggered 
a " +
-                            "leadership change. Returning {}.",
-                            request.brokerId(), topic.name, partitionId, 
error);
+                            "the ongoing partition reassignment and triggered 
a leadership change {}. Returning {}.",
+                            request.brokerId(), topic.name, partitionId,
+                            logPartitionChangeInfo(originalPartition, 
partition), error);
                         responseTopicData.partitions().add(new 
AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(partitionId).
                             setErrorCode(error.code()));
                         continue;
                     } else if (isReassignmentInProgress(partition)) {
                         log.info("AlterPartition request from node {} for 
{}-{} completed " +
-                            "the ongoing partition reassignment.", 
request.brokerId(),
-                            topic.name, partitionId);
+                            "the ongoing partition reassignment. {}",
+                            request.brokerId(), topic.name, partitionId, 
logPartitionChangeInfo(originalPartition, partition));
                     }
                 }
 
@@ -1194,6 +1201,14 @@ public class ReplicationControlManager {
         return ControllerResult.of(records, response);
     }
 
+    private static String logPartitionChangeInfo(PartitionRegistration 
oldRegistration, PartitionRegistration newRegistration) {
+        return String.format("isr: %s -> %s, replicaSet: %s -> %s, 
partitionEpoch: %d -> %d, leaderEpoch: %d -> %d",
+            Arrays.toString(oldRegistration.isr), 
Arrays.toString(newRegistration.isr),
+            Arrays.toString(oldRegistration.replicas), 
Arrays.toString(newRegistration.replicas),
+            oldRegistration.partitionEpoch, newRegistration.partitionEpoch,
+            oldRegistration.leaderEpoch, newRegistration.leaderEpoch);
+    }
+
     /**
      * Validates that a batch of topics will create less than {@value 
MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch
      * has led to out-of-memory exceptions. We use this validation to fail 
earlier to avoid allocating the memory.
@@ -1252,35 +1267,38 @@ public class ReplicationControlManager {
         // this case to give the leader an opportunity to find the new 
controller.
         if (partitionData.leaderEpoch() > partition.leaderEpoch) {
             log.debug("Rejecting AlterPartition request from node {} for {}-{} 
because " +
-                    "the current leader epoch is {}, which is greater than the 
local value {}.",
-                brokerId, topic.name, partitionId, partition.leaderEpoch, 
partitionData.leaderEpoch());
+                    "the current leader epoch is {}, which is greater than the 
local value {}. {}",
+                brokerId, topic.name, partitionId, partition.leaderEpoch, 
partitionData.leaderEpoch(),
+                logPartitionChangeInfo(partition, 
partitionData.newIsrWithEpochs()));
             return NOT_CONTROLLER;
         }
         if (partitionData.partitionEpoch() > partition.partitionEpoch) {
             log.debug("Rejecting AlterPartition request from node {} for {}-{} 
because " +
-                    "the current partition epoch is {}, which is greater than 
the local value {}.",
-                brokerId, topic.name, partitionId, partition.partitionEpoch, 
partitionData.partitionEpoch());
+                    "the current partition epoch is {}, which is greater than 
the local value {}. {}",
+                brokerId, topic.name, partitionId, partition.partitionEpoch, 
partitionData.partitionEpoch(),
+                logPartitionChangeInfo(partition, 
partitionData.newIsrWithEpochs()));
             return NOT_CONTROLLER;
         }
         if (partitionData.leaderEpoch() < partition.leaderEpoch) {
             log.debug("Rejecting AlterPartition request from node {} for {}-{} 
because " +
-                    "the current leader epoch is {}, not {}.", brokerId, 
topic.name,
-                    partitionId, partition.leaderEpoch, 
partitionData.leaderEpoch());
+                    "the current leader epoch is {}, not {}. {}", brokerId, 
topic.name,
+                    partitionId, partition.leaderEpoch, 
partitionData.leaderEpoch(),
+                    logPartitionChangeInfo(partition, 
partitionData.newIsrWithEpochs()));
 
             return FENCED_LEADER_EPOCH;
         }
         if (brokerId != partition.leader) {
             log.info("Rejecting AlterPartition request from node {} for {}-{} 
because " +
-                    "the current leader is {}.", brokerId, topic.name,
-                    partitionId, partition.leader);
+                    "the current leader is {}. {}", brokerId, topic.name,
+                    partitionId, partition.leader, 
logPartitionChangeInfo(partition, partitionData.newIsrWithEpochs()));
 
             return INVALID_REQUEST;
         }
         if (partitionData.partitionEpoch() < partition.partitionEpoch) {
             log.info("Rejecting AlterPartition request from node {} for {}-{} 
because " +
-                    "the current partition epoch is {}, not {}.", brokerId,
+                    "the current partition epoch is {}, not {}. {}", brokerId,
                     topic.name, partitionId, partition.partitionEpoch,
-                    partitionData.partitionEpoch());
+                    partitionData.partitionEpoch(), 
logPartitionChangeInfo(partition, partitionData.newIsrWithEpochs()));
 
             return INVALID_UPDATE_VERSION;
         }
@@ -1290,34 +1308,36 @@ public class ReplicationControlManager {
 
         if (!Replicas.validateIsr(partition.replicas, newIsr)) {
             log.error("Rejecting AlterPartition request from node {} for {}-{} 
because " +
-                    "it specified an invalid ISR {}.", brokerId,
-                    topic.name, partitionId, partitionData.newIsrWithEpochs());
+                    "it specified an invalid ISR. {}", brokerId,
+                    topic.name, partitionId, logPartitionChangeInfo(partition, 
partitionData.newIsrWithEpochs()));
 
             return INVALID_REQUEST;
         }
         if (!Replicas.contains(newIsr, partition.leader)) {
             // The ISR must always include the current leader.
             log.error("Rejecting AlterPartition request from node {} for {}-{} 
because " +
-                    "it specified an invalid ISR {} that doesn't include 
itself.",
-                    brokerId, topic.name, partitionId, 
partitionData.newIsrWithEpochs());
+                    "it specified an invalid ISR that doesn't include itself. 
{}",
+                    brokerId, topic.name, partitionId,
+                    logPartitionChangeInfo(partition, 
partitionData.newIsrWithEpochs()));
 
             return INVALID_REQUEST;
         }
         LeaderRecoveryState leaderRecoveryState = 
LeaderRecoveryState.of(partitionData.leaderRecoveryState());
         if (leaderRecoveryState == LeaderRecoveryState.RECOVERING && 
newIsr.length > 1) {
             log.info("Rejecting AlterPartition request from node {} for {}-{} 
because " +
-                    "the ISR {} had more than one replica while the leader was 
still " +
-                    "recovering from an unclean leader election {}.",
-                    brokerId, topic.name, partitionId, 
partitionData.newIsrWithEpochs(),
-                    leaderRecoveryState);
+                    "the ISR had more than one replica while the leader was 
still " +
+                    "recovering from an unclean leader election {}. {}",
+                    brokerId, topic.name, partitionId, leaderRecoveryState,
+                    logPartitionChangeInfo(partition, 
partitionData.newIsrWithEpochs()));
 
             return INVALID_REQUEST;
         }
         if (partition.leaderRecoveryState == LeaderRecoveryState.RECOVERED &&
                 leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
             log.info("Rejecting AlterPartition request from node {} for {}-{} 
because " +
-                    "the leader recovery state cannot change from RECOVERED to 
RECOVERING.",
-                    brokerId, topic.name, partitionId);
+                    "the leader recovery state cannot change from RECOVERED to 
RECOVERING. {}",
+                    brokerId, topic.name, partitionId,
+                    logPartitionChangeInfo(partition, 
partitionData.newIsrWithEpochs()));
 
             return INVALID_REQUEST;
         }
@@ -1325,14 +1345,25 @@ public class ReplicationControlManager {
         List<IneligibleReplica> ineligibleReplicas = 
ineligibleReplicasForIsr(partitionData.newIsrWithEpochs());
         if (!ineligibleReplicas.isEmpty()) {
             log.info("Rejecting AlterPartition request from node {} for {}-{} 
because " +
-                    "it specified ineligible replicas {} in the new ISR {}.",
-                    brokerId, topic.name, partitionId, ineligibleReplicas, 
partitionData.newIsrWithEpochs());
+                    "it specified ineligible replicas {} in the new ISR. {}",
+                    brokerId, topic.name, partitionId, ineligibleReplicas,
+                    logPartitionChangeInfo(partition, 
partitionData.newIsrWithEpochs()));
             return INELIGIBLE_REPLICA;
         }
 
         return Errors.NONE;
     }
 
+    private static String logPartitionChangeInfo(
+        PartitionRegistration partition,
+        List<BrokerState> requestedIsr
+    ) {
+        return String.format("Proposed ISR was %s and current ISR is %s. " +
+                "Current replica set is %s. Current partitionEpoch is %d. 
Current leaderEpoch is %d.",
+            requestedIsr, Arrays.toString(partition.isr), 
Arrays.toString(partition.replicas),
+            partition.partitionEpoch, partition.leaderEpoch);
+    }
+
     private List<IneligibleReplica> ineligibleReplicasForIsr(List<BrokerState> 
brokerStates) {
         List<IneligibleReplica> ineligibleReplicas = new ArrayList<>(0);
         for (BrokerState brokerState : brokerStates) {
@@ -1788,25 +1819,33 @@ public class ReplicationControlManager {
         Iterator<TopicIdPartition> iterator = 
brokersToIsrs.partitionsWithNoLeader();
         while (iterator.hasNext() && records.size() < maxElections) {
             TopicIdPartition topicIdPartition = iterator.next();
+            int partitionId = topicIdPartition.partitionId();
             TopicControlInfo topic = topics.get(topicIdPartition.topicId());
+
             if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
-                ApiError result = electLeader(topic.name, 
topicIdPartition.partitionId(),
+                ApiError result = electLeader(topic.name, partitionId,
                         ElectionType.UNCLEAN, records);
                 if (result.error().equals(Errors.NONE)) {
-                    log.info("Triggering unclean leader election for offline 
partition {}-{}.",
-                            topic.name, topicIdPartition.partitionId());
+                    log.info("Triggering unclean leader election for offline 
partition {}-{}. {}",
+                            topic.name, partitionId, 
logPartitionInfo(topic.parts.get(partitionId)));
                 } else {
-                    log.warn("Cannot trigger unclean leader election for 
offline partition {}-{}: {}",
-                            topic.name, topicIdPartition.partitionId(), 
result.error());
+                    log.warn("Cannot trigger unclean leader election for 
offline partition {}-{}: {}. {}",
+                            topic.name, partitionId, result.error(), 
logPartitionInfo(topic.parts.get(partitionId)));
                 }
             } else if (log.isDebugEnabled()) {
                 log.debug("Cannot trigger unclean leader election for offline 
partition {}-{} " +
-                                "because unclean leader election is disabled 
for this topic.",
-                        topic.name, topicIdPartition.partitionId());
+                                "because unclean leader election is disabled 
for this topic. {}",
+                        topic.name, partitionId, 
logPartitionInfo(topic.parts.get(partitionId)));
             }
         }
     }
 
+    private static String logPartitionInfo(PartitionRegistration partition) {
+        return String.format("(isr: %s, replicaSet: %s, partitionEpoch: %d, 
leaderEpoch: %d)",
+            Arrays.toString(partition.isr), 
Arrays.toString(partition.replicas), partition.partitionEpoch,
+            partition.leaderEpoch);
+    }
+
     ControllerResult<List<CreatePartitionsTopicResult>> createPartitions(
         ControllerRequestContext context,
         List<CreatePartitionsTopic> topics

Reply via email to