This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fc7d912e8bb KAFKA-15109 Ensure the leader epoch bump occurs for older
MetadataVersions (#13910)
fc7d912e8bb is described below
commit fc7d912e8bb856500fa27b7455e6eff098c08196
Author: David Arthur <[email protected]>
AuthorDate: Tue Jun 27 11:49:20 2023 -0400
KAFKA-15109 Ensure the leader epoch bump occurs for older MetadataVersions
(#13910)
This fixes a regression introduced by the previous KAFKA-15109 commit
(d0457f7360 on trunk).
Reviewers: Colin P. McCabe <[email protected]>, José Armando García Sancio
<[email protected]>
---
.../kafka/controller/PartitionChangeBuilder.java | 12 ++++--
.../controller/ReplicationControlManager.java | 12 +++---
.../controller/PartitionChangeBuilderTest.java | 50 +++++++++++++++++++---
.../kafka/server/common/MetadataVersion.java | 4 +-
4 files changed, 59 insertions(+), 19 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 78c1c2363d7..78d22967e2a 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -81,7 +81,7 @@ public class PartitionChangeBuilder {
private List<Integer> targetAdding;
private Election election = Election.ONLINE;
private LeaderRecoveryState targetLeaderRecoveryState;
- private boolean bumpLeaderEpochOnIsrShrink;
+ private boolean zkMigrationEnabled;
public PartitionChangeBuilder(
@@ -96,7 +96,7 @@ public class PartitionChangeBuilder {
this.partitionId = partitionId;
this.isAcceptableLeader = isAcceptableLeader;
this.metadataVersion = metadataVersion;
- this.bumpLeaderEpochOnIsrShrink =
!metadataVersion.isSkipLeaderEpochBumpSupported();
+ this.zkMigrationEnabled = false;
this.targetIsr = Replicas.toList(partition.isr);
this.targetReplicas = Replicas.toList(partition.replicas);
@@ -144,8 +144,8 @@ public class PartitionChangeBuilder {
return this;
}
- public PartitionChangeBuilder setBumpLeaderEpochOnIsrShrink(boolean
bumpLeaderEpochOnIsrShrink) {
- this.bumpLeaderEpochOnIsrShrink = bumpLeaderEpochOnIsrShrink;
+ public PartitionChangeBuilder setZkMigrationEnabled(boolean
zkMigrationEnabled) {
+ this.zkMigrationEnabled = zkMigrationEnabled;
return this;
}
@@ -288,9 +288,13 @@ public class PartitionChangeBuilder {
*/
void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
if (record.leader() == NO_LEADER_CHANGE) {
+ boolean bumpLeaderEpochOnIsrShrink =
metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled;
+
if (!Replicas.contains(targetReplicas, partition.replicas)) {
+ // Reassignment
record.setLeader(partition.leader);
} else if (bumpLeaderEpochOnIsrShrink &&
!Replicas.contains(targetIsr, partition.isr)) {
+ // ISR shrink
record.setLeader(partition.leader);
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 7af6ab8b317..f8631e4560b 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -995,7 +995,7 @@ public class ReplicationControlManager {
clusterControl::isActive,
featureControl.metadataVersion()
);
-
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1383,7 +1383,7 @@ public class ReplicationControlManager {
clusterControl::isActive,
featureControl.metadataVersion()
);
-
builder.setElection(election).setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+
builder.setElection(election).setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
Optional<ApiMessageAndVersion> record = builder.build();
if (!record.isPresent()) {
if (electionType == ElectionType.PREFERRED) {
@@ -1519,7 +1519,7 @@ public class ReplicationControlManager {
featureControl.metadataVersion()
);
builder.setElection(PartitionChangeBuilder.Election.PREFERRED)
-
.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+ .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.build().ifPresent(records::add);
}
@@ -1740,7 +1740,7 @@ public class ReplicationControlManager {
isAcceptableLeader,
featureControl.metadataVersion()
);
-
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1853,7 +1853,7 @@ public class ReplicationControlManager {
clusterControl::isActive,
featureControl.metadataVersion()
);
-
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+ builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1911,7 +1911,7 @@ public class ReplicationControlManager {
clusterControl::isActive,
featureControl.metadataVersion()
);
-
builder.setBumpLeaderEpochOnIsrShrink(clusterControl.zkRegistrationAllowed());
+ builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index c707642bebd..c2b07956d86 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -93,7 +93,11 @@ public class PartitionChangeBuilderTest {
private final static Uuid FOO_ID =
Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
private static PartitionChangeBuilder createFooBuilder() {
- return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3,
MetadataVersion.latest());
+ return createFooBuilder(MetadataVersion.latest());
+ }
+
+ private static PartitionChangeBuilder createFooBuilder(MetadataVersion
metadataVersion) {
+ return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3,
metadataVersion);
}
private static final PartitionRegistration BAR = new
PartitionRegistration.Builder().
@@ -203,11 +207,12 @@ public class PartitionChangeBuilderTest {
new PartitionChangeRecord(),
NO_LEADER_CHANGE
);
+ // Expanding the ISR during migration doesn't increase leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder()
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1,
3, 4)))
- .setBumpLeaderEpochOnIsrShrink(true),
+ .setZkMigrationEnabled(true),
new PartitionChangeRecord(),
NO_LEADER_CHANGE
);
@@ -228,13 +233,44 @@ public class PartitionChangeBuilderTest {
new PartitionChangeRecord(),
1
);
+ }
- // KAFKA-15109: Shrinking the ISR while in ZK migration mode does
increase the leader epoch
+ @Test
+ public void testLeaderEpochBumpZkMigration() {
+ // KAFKA-15109: Shrinking the ISR while in ZK migration mode requires
a leader epoch bump
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder()
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
- .setBumpLeaderEpochOnIsrShrink(true),
+ .setZkMigrationEnabled(true),
+ new PartitionChangeRecord(),
+ 1
+ );
+
+ testTriggerLeaderEpochBumpIfNeededLeader(
+ createFooBuilder()
+ .setTargetIsrWithBrokerStates(
+
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
+ .setZkMigrationEnabled(false),
+ new PartitionChangeRecord(),
+ NO_LEADER_CHANGE
+ );
+
+ // For older MV, always expect the epoch to increase regardless of ZK
migration
+ testTriggerLeaderEpochBumpIfNeededLeader(
+ createFooBuilder(MetadataVersion.IBP_3_5_IV2)
+ .setTargetIsrWithBrokerStates(
+
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
+ .setZkMigrationEnabled(true),
+ new PartitionChangeRecord(),
+ 1
+ );
+
+ testTriggerLeaderEpochBumpIfNeededLeader(
+ createFooBuilder(MetadataVersion.IBP_3_5_IV2)
+ .setTargetIsrWithBrokerStates(
+
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
+ .setZkMigrationEnabled(false),
new PartitionChangeRecord(),
1
);
@@ -437,7 +473,7 @@ public class PartitionChangeBuilderTest {
brokerId -> false,
metadataVersion
);
- offlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
+ offlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
// Set the target ISR to empty to indicate that the last leader is
offline
offlineBuilder.setTargetIsrWithBrokerStates(Collections.emptyList());
@@ -463,7 +499,7 @@ public class PartitionChangeBuilderTest {
brokerId -> true,
metadataVersion
);
- onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
+ onlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
// The only broker in the ISR is elected leader and stays in the
recovering
changeRecord = (PartitionChangeRecord)
onlineBuilder.build().get().message();
@@ -500,7 +536,7 @@ public class PartitionChangeBuilderTest {
brokerId -> brokerId == leaderId,
metadataVersion
).setElection(Election.UNCLEAN);
- onlineBuilder.setBumpLeaderEpochOnIsrShrink(zkMigrationsEnabled);
+ onlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
// The partition should stay as recovering
PartitionChangeRecord changeRecord = (PartitionChangeRecord)
onlineBuilder
.build()
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 2637aef7016..0ccfcc06be1 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -263,8 +263,8 @@ public enum MetadataVersion {
return this.isAtLeast(IBP_3_5_IV2);
}
- public boolean isSkipLeaderEpochBumpSupported() {
- return this.isAtLeast(IBP_3_6_IV0);
+ public boolean isLeaderEpochBumpRequiredOnIsrShrink() {
+ return !this.isAtLeast(IBP_3_6_IV0);
}
public boolean isKRaftSupported() {