This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 15e938ef438 KAFKA-17973: Relax Restriction for Voters Set Change (#17728) 15e938ef438 is described below commit 15e938ef438c64304a76cc13837164e65005e2f2 Author: Hailey Ni <136509748+hni61...@users.noreply.github.com> AuthorDate: Mon Jan 13 08:44:46 2025 -0800 KAFKA-17973: Relax Restriction for Voters Set Change (#17728) Relax the voter set change validation that exists in KRaft. When reading the kraft partition and validating voter set changes allow the voter set to have more than one change. This violates the invariant that after a voter change there are overlapping voters for all possible majorities. This is okay because the KRaft leader checks that there are no pending voter set updates when handling an add voter request and a remove voter request. Reviewers: José Armando García Sancio <jsan...@apache.org> --- .../internals/KRaftControlRecordStateMachine.java | 2 +- .../kafka/raft/internals/VoterSetHistory.java | 17 +++++--- .../kafka/raft/internals/VoterSetHistoryTest.java | 50 +++++++++++----------- 3 files changed, 36 insertions(+), 33 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index c1d4a0b2f2d..e2b6f7e43e4 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -90,7 +90,7 @@ public final class KRaftControlRecordStateMachine { LogContext logContext ) { this.log = log; - this.voterSetHistory = new VoterSetHistory(staticVoterSet); + this.voterSetHistory = new VoterSetHistory(staticVoterSet, logContext); this.serde = serde; this.bufferSupplier = bufferSupplier; this.maxBatchSizeBytes = maxBatchSizeBytes; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java index 6ab304f8c16..5e25c603542 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java @@ -16,8 +16,11 @@ */ package org.apache.kafka.raft.internals; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.raft.VoterSet; +import org.slf4j.Logger; + import java.util.Optional; import java.util.OptionalLong; @@ -31,9 +34,11 @@ import java.util.OptionalLong; public final class VoterSetHistory { private final VoterSet staticVoterSet; private final LogHistory<VoterSet> votersHistory = new TreeMapLogHistory<>(); + private final Logger logger; - VoterSetHistory(VoterSet staticVoterSet) { + VoterSetHistory(VoterSet staticVoterSet, LogContext logContext) { this.staticVoterSet = staticVoterSet; + this.logger = logContext.logger(getClass()); } /** @@ -55,12 +60,10 @@ public final class VoterSetHistory { // all replicas. VoterSet lastVoterSet = lastEntry.get().value(); if (!lastVoterSet.hasOverlappingMajority(voters)) { - throw new IllegalArgumentException( - String.format( - "Last voter set %s doesn't have an overlapping majority with the new voter set %s", - lastVoterSet, - voters - ) + logger.info( + "Last voter set ({}) doesn't have an overlapping majority with the new voter set ({})", + lastVoterSet, + voters ); } } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java index 04f8aa8d365..302a1da5212 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.raft.internals; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.raft.VoterSet; import org.apache.kafka.raft.VoterSetTest; @@ -33,7 +34,7 @@ public final class VoterSetHistoryTest { @Test void testStaticVoterSet() { VoterSet staticVoterSet = VoterSet.fromMap(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)); - VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); + VoterSetHistory votersHistory = voterSetHistory(staticVoterSet); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0)); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100)); @@ -54,7 +55,7 @@ public final class VoterSetHistoryTest { @Test void TestNoStaticVoterSet() { - VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); + VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty()); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0)); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100)); @@ -65,7 +66,7 @@ public final class VoterSetHistoryTest { void testAddAt() { Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); + VoterSetHistory votersHistory = voterSetHistory(staticVoterSet); assertThrows( IllegalArgumentException.class, @@ -95,7 +96,7 @@ public final class VoterSetHistoryTest { void testBootstrapAddAt() { Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); VoterSet bootstrapVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); + VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty()); votersHistory.addAt(-1, bootstrapVoterSet); assertEquals(bootstrapVoterSet, votersHistory.lastValue()); @@ -124,7 +125,7 @@ public final class VoterSetHistoryTest { @Test void testAddAtNonOverlapping() { - VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); + VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty()); Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); VoterSet voterSet = VoterSet.fromMap(new HashMap<>(voterMap)); @@ -132,35 +133,30 @@ public final class VoterSetHistoryTest { // Add a starting voter to the history votersHistory.addAt(100, voterSet); - // Remove voter so that it doesn't overlap - VoterSet nonoverlappingRemovedSet = voterSet + // Assert multiple voters can be removed at a time + VoterSet nonOverlappingRemovedSet = voterSet .removeVoter(voterMap.get(1).voterKey()).get() .removeVoter(voterMap.get(2).voterKey()).get(); - assertThrows( - IllegalArgumentException.class, - () -> votersHistory.addAt(200, nonoverlappingRemovedSet) - ); - assertEquals(voterSet, votersHistory.lastValue()); + votersHistory.addAt(200, nonOverlappingRemovedSet); + assertEquals(nonOverlappingRemovedSet, votersHistory.lastValue()); - // Add voters so that it doesn't overlap - VoterSet nonoverlappingAddSet = voterSet - .addVoter(VoterSetTest.voterNode(4, true)).get() - .addVoter(VoterSetTest.voterNode(5, true)).get(); + // Assert multiple voters can be added at a time + VoterSet nonOverlappingAddSet = nonOverlappingRemovedSet + .addVoter(VoterSetTest.voterNode(1, true)).get() + .addVoter(VoterSetTest.voterNode(2, true)).get(); - assertThrows( - IllegalArgumentException.class, - () -> votersHistory.addAt(200, nonoverlappingAddSet) - ); - assertEquals(voterSet, votersHistory.lastValue()); + votersHistory.addAt(300, nonOverlappingAddSet); + + assertEquals(nonOverlappingAddSet, votersHistory.lastValue()); } @Test void testNonoverlappingFromStaticVoterSet() { Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); + VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty()); // Remove voter so that it doesn't overlap VoterSet nonoverlappingRemovedSet = staticVoterSet @@ -175,7 +171,7 @@ public final class VoterSetHistoryTest { void testTruncateTo() { Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); + VoterSetHistory votersHistory = voterSetHistory(staticVoterSet); // Add voter 4 to the voter set and voter set history voterMap.put(4, VoterSetTest.voterNode(4, true)); @@ -201,7 +197,7 @@ public final class VoterSetHistoryTest { void testTrimPrefixTo() { Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); + VoterSetHistory votersHistory = voterSetHistory(staticVoterSet); // Add voter 4 to the voter set and voter set history voterMap.put(4, VoterSetTest.voterNode(4, true)); @@ -234,7 +230,7 @@ public final class VoterSetHistoryTest { void testClear() { Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); + VoterSetHistory votersHistory = voterSetHistory(staticVoterSet); // Add voter 4 to the voter set and voter set history voterMap.put(4, VoterSetTest.voterNode(4, true)); @@ -250,4 +246,8 @@ public final class VoterSetHistoryTest { assertEquals(staticVoterSet, votersHistory.lastValue()); } + + private VoterSetHistory voterSetHistory(VoterSet staticVoterSet) { + return new VoterSetHistory(staticVoterSet, new LogContext()); + } }