This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d0457f7360e KAFKA-15109 Don't skip leader epoch bump while in
migration mode (#13890)
d0457f7360e is described below
commit d0457f7360e2e3d34800fbd41de429afbaabd101
Author: David Arthur <[email protected]>
AuthorDate: Wed Jun 21 13:09:05 2023 -0400
KAFKA-15109 Don't skip leader epoch bump while in migration mode (#13890)
While in migration mode, the KRaft controller must always bump the leader
epoch when shrinking an ISR.
This is required to maintain compatibility with the ZK brokers. Without the
epoch bump, the ZK brokers
will ignore the partition state change present in the LeaderAndIsrRequest
since it would not contain a new
leader epoch.
Reviewers: Colin P. McCabe <[email protected]>
---
.../kafka/controller/PartitionChangeBuilder.java | 15 +++++++-
.../controller/ReplicationControlManager.java | 9 ++++-
.../controller/PartitionChangeBuilderTest.java | 44 +++++++++++++++++++---
3 files changed, 58 insertions(+), 10 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 1c110e567a5..78c1c2363d7 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -81,6 +81,8 @@ public class PartitionChangeBuilder {
private List<Integer> targetAdding;
private Election election = Election.ONLINE;
private LeaderRecoveryState targetLeaderRecoveryState;
+ private boolean bumpLeaderEpochOnIsrShrink;
+
public PartitionChangeBuilder(
PartitionRegistration partition,
@@ -94,6 +96,8 @@ public class PartitionChangeBuilder {
this.partitionId = partitionId;
this.isAcceptableLeader = isAcceptableLeader;
this.metadataVersion = metadataVersion;
+ this.bumpLeaderEpochOnIsrShrink =
!metadataVersion.isSkipLeaderEpochBumpSupported();
+
this.targetIsr = Replicas.toList(partition.isr);
this.targetReplicas = Replicas.toList(partition.replicas);
this.targetRemoving = Replicas.toList(partition.removingReplicas);
@@ -140,6 +144,11 @@ public class PartitionChangeBuilder {
return this;
}
+ public PartitionChangeBuilder setBumpLeaderEpochOnIsrShrink(boolean
bumpLeaderEpochOnIsrShrink) {
+ this.bumpLeaderEpochOnIsrShrink = bumpLeaderEpochOnIsrShrink;
+ return this;
+ }
+
// VisibleForTesting
static class ElectionResult {
final int node;
@@ -273,13 +282,15 @@ public class PartitionChangeBuilder {
* that required that the leader epoch be bump whenever the ISR shrank. In
MV 3.6 this leader
* bump is not required when the ISR shrinks. Note, that the leader epoch
is never increased if
* the ISR expanded.
+ *
+ * In MV 3.6 and beyond, if the controller is in ZK migration mode, the
leader epoch must
+ * be bumped during ISR shrink for compatability with ZK brokers.
*/
void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
if (record.leader() == NO_LEADER_CHANGE) {
if (!Replicas.contains(targetReplicas, partition.replicas)) {
record.setLeader(partition.leader);
- } else if (!metadataVersion.isSkipLeaderEpochBumpSupported() &&
- !Replicas.contains(targetIsr, partition.isr)) {
+ } else if (bumpLeaderEpochOnIsrShrink &&
!Replicas.contains(targetIsr, partition.isr)) {
record.setLeader(partition.leader);
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index f7d5e69ed8b..7af6ab8b317 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -995,6 +995,7 @@ public class ReplicationControlManager {
clusterControl::isActive,
featureControl.metadataVersion()
);
+
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1382,7 +1383,7 @@ public class ReplicationControlManager {
clusterControl::isActive,
featureControl.metadataVersion()
);
- builder.setElection(election);
+
builder.setElection(election).setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
Optional<ApiMessageAndVersion> record = builder.build();
if (!record.isPresent()) {
if (electionType == ElectionType.PREFERRED) {
@@ -1517,7 +1518,8 @@ public class ReplicationControlManager {
clusterControl::isActive,
featureControl.metadataVersion()
);
- builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
+ builder.setElection(PartitionChangeBuilder.Election.PREFERRED)
+
.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
builder.build().ifPresent(records::add);
}
@@ -1738,6 +1740,7 @@ public class ReplicationControlManager {
isAcceptableLeader,
featureControl.metadataVersion()
);
+
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1850,6 +1853,7 @@ public class ReplicationControlManager {
clusterControl::isActive,
featureControl.metadataVersion()
);
+
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1907,6 +1911,7 @@ public class ReplicationControlManager {
clusterControl::isActive,
featureControl.metadataVersion()
);
+
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index 3374f12d7ac..c707642bebd 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -31,12 +31,14 @@ import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.function.IntPredicate;
+import java.util.stream.Stream;
import static
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.controller.PartitionChangeBuilder.Election;
@@ -46,6 +48,7 @@ import static
org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
@Timeout(value = 40)
@@ -200,6 +203,14 @@ public class PartitionChangeBuilderTest {
new PartitionChangeRecord(),
NO_LEADER_CHANGE
);
+ testTriggerLeaderEpochBumpIfNeededLeader(
+ createFooBuilder()
+ .setTargetIsrWithBrokerStates(
+
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1,
3, 4)))
+ .setBumpLeaderEpochOnIsrShrink(true),
+ new PartitionChangeRecord(),
+ NO_LEADER_CHANGE
+ );
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new
PartitionChangeRecord(),
NO_LEADER_CHANGE);
@@ -217,6 +228,16 @@ public class PartitionChangeBuilderTest {
new PartitionChangeRecord(),
1
);
+
+ // KAFKA-15109: Shrinking the ISR while in ZK migration mode does
increase the leader epoch
+ testTriggerLeaderEpochBumpIfNeededLeader(
+ createFooBuilder()
+ .setTargetIsrWithBrokerStates(
+
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
+ .setBumpLeaderEpochOnIsrShrink(true),
+ new PartitionChangeRecord(),
+ 1
+ );
}
@Test
@@ -382,9 +403,18 @@ public class PartitionChangeBuilderTest {
);
}
+ private static Stream<Arguments> leaderRecoveryAndZkMigrationParams() {
+ return Stream.of(
+ arguments(true, true),
+ arguments(true, false),
+ arguments(false, true),
+ arguments(false, false)
+ );
+ }
+
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testChangeInLeadershipDoesNotChangeRecoveryState(boolean
isLeaderRecoverySupported) {
+ @MethodSource("leaderRecoveryAndZkMigrationParams")
+ public void testChangeInLeadershipDoesNotChangeRecoveryState(boolean
isLeaderRecoverySupported, boolean zkMigrationsEnabled) {
final byte noChange = (byte) -1;
int leaderId = 1;
LeaderRecoveryState recoveryState = LeaderRecoveryState.RECOVERING;
@@ -407,6 +437,7 @@ public class PartitionChangeBuilderTest {
brokerId -> false,
metadataVersion
);
+ offlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
// Set the target ISR to empty to indicate that the last leader is
offline
offlineBuilder.setTargetIsrWithBrokerStates(Collections.emptyList());
@@ -432,6 +463,7 @@ public class PartitionChangeBuilderTest {
brokerId -> true,
metadataVersion
);
+ onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
// The only broker in the ISR is elected leader and stays in the
recovering
changeRecord = (PartitionChangeRecord)
onlineBuilder.build().get().message();
@@ -445,8 +477,8 @@ public class PartitionChangeBuilderTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- void testUncleanSetsLeaderRecoveringState(boolean
isLeaderRecoverySupported) {
+ @MethodSource("leaderRecoveryAndZkMigrationParams")
+ void testUncleanSetsLeaderRecoveringState(boolean
isLeaderRecoverySupported, boolean zkMigrationsEnabled) {
final byte noChange = (byte) -1;
int leaderId = 1;
PartitionRegistration registration = new
PartitionRegistration.Builder().
@@ -468,7 +500,7 @@ public class PartitionChangeBuilderTest {
brokerId -> brokerId == leaderId,
metadataVersion
).setElection(Election.UNCLEAN);
-
+ onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
// The partition should stay as recovering
PartitionChangeRecord changeRecord = (PartitionChangeRecord)
onlineBuilder
.build()