soarez commented on code in PR #15810: URL: https://github.com/apache/kafka/pull/15810#discussion_r1581762636
########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -365,45 +369,62 @@ private void tryElection(PartitionChangeRecord record) { } /** - * Trigger a leader epoch bump if one is needed. - * - * We need to bump the leader epoch if: - * 1. The leader changed, or - * 2. The new replica list does not contain all the nodes that the old replica list did. - * - * Changes that do NOT fall in any of these categories will increase the partition epoch, but - * not the leader epoch. Note that if the leader epoch increases, the partition epoch will - * always increase as well; there is no case where the partition epoch increases more slowly - * than the leader epoch. + * Trigger a leader epoch bump if one is needed because of replica reassignment. * - * If the PartitionChangeRecord sets the leader field to something other than - * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of - * case 1. In this function, we check for cases 2 and 3, and handle them by manually - * setting record.leader to the current leader. - * - * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager - * that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader - * bump is not required when the ISR shrinks. Note, that the leader epoch is never increased if - * the ISR expanded. + * Note that if the leader epoch increases, the partition epoch will always increase as well; there is no + * case where the partition epoch increases more slowly than the leader epoch. + */ + void triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord record) { Review Comment: This issue predates the PR, but I want to say it anyway: It's not great that both these method are named and documented in a way that conveys a leader epoch bump but what they do is merely set the leader value in the change record. One needs to know about `PartitionRegistration#merge(PartitionChangeRecord)` to make the link between setting the leader on the PartitionChangeRecord and provoking a leader epoch bump. ########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -365,45 +369,62 @@ private void tryElection(PartitionChangeRecord record) { } /** - * Trigger a leader epoch bump if one is needed. - * - * We need to bump the leader epoch if: - * 1. The leader changed, or - * 2. The new replica list does not contain all the nodes that the old replica list did. - * - * Changes that do NOT fall in any of these categories will increase the partition epoch, but - * not the leader epoch. Note that if the leader epoch increases, the partition epoch will - * always increase as well; there is no case where the partition epoch increases more slowly - * than the leader epoch. + * Trigger a leader epoch bump if one is needed because of replica reassignment. * - * If the PartitionChangeRecord sets the leader field to something other than - * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of - * case 1. In this function, we check for cases 2 and 3, and handle them by manually - * setting record.leader to the current leader. - * - * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager - * that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader - * bump is not required when the ISR shrinks. Note, that the leader epoch is never increased if - * the ISR expanded. + * Note that if the leader epoch increases, the partition epoch will always increase as well; there is no + * case where the partition epoch increases more slowly than the leader epoch. + */ + void triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord record) { + if (record.leader() != NO_LEADER_CHANGE) { + // The leader is already changing, so there will already be a leader epoch bump. + return; + } + if (!Replicas.contains(targetReplicas, partition.replicas)) { + // If the new replica list does not contain all the brokers that the old one did, + // ensure that there will be a leader epoch bump by setting the leader field. + record.setLeader(partition.leader); + } + } + + /** + * Trigger a leader epoch bump if one is needed because of an ISR shrink. * - * In MV 3.6 and beyond, if the controller is in ZK migration mode, the leader epoch must - * be bumped during ISR shrink for compatability with ZK brokers. + * Note that it's important to call this function only after we have set the ISR field in + * the PartitionChangeRecord. */ - void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) { - if (record.leader() == NO_LEADER_CHANGE) { - boolean bumpLeaderEpochOnIsrShrink = metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled; - - if (!Replicas.contains(targetReplicas, partition.replicas)) { - // Reassignment - record.setLeader(partition.leader); - } else if (bumpLeaderEpochOnIsrShrink && !Replicas.contains(targetIsr, partition.isr)) { - // ISR shrink - record.setLeader(partition.leader); - } + void triggerLeaderEpochBumpForIsrShrinkIfNeeded(PartitionChangeRecord record) { + if (!(metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled)) { + // We only need to bump the leader epoch on an ISR shrink in two cases: + // + // 1. In older metadata versions before 3.6, there was a bug (KAFKA-15021) in the + // broker replica manager that required that the leader epoch be bumped whenever + // the ISR shrank. (This was never necessary for EXPANSIONS, only SHRINKS.) + // + // 2. During ZK migration, we bump the leader epoch during all ISR shrinks, in order + // to maintain compatibility with migrating brokers that are still in ZK mode. + // + // If we're not in either case, we can exit here. + return; + } + if (record.leader() != NO_LEADER_CHANGE) { + // The leader is already changing, so there will already be a leader epoch bump. + return; + } + if (record.isr() == null) { + // The ISR is not changing. + return; + } + if (!Replicas.contains(record.isr(), partition.isr)) { + // If the new ISR list does not contain all the brokers that the old one did, + // ensure that there will be a leader epoch bump by setting the leader field. + record.setLeader(partition.leader); } } - private void completeReassignmentIfNeeded() { + /** + * @return true if the reassignment was completed; false otherwise. + */ + private boolean completeReassignmentIfNeeded() { Review Comment: This change doesn't seem to be necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org