This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new ba01ad0f0cf KAFKA-17973: Relax Restriction for Voters Set Change 
(#17728)
ba01ad0f0cf is described below

commit ba01ad0f0cfa69966be01c90f1c607a54b20ca2e
Author: Hailey Ni <136509748+hni61...@users.noreply.github.com>
AuthorDate: Mon Jan 13 08:44:46 2025 -0800

    KAFKA-17973: Relax Restriction for Voters Set Change (#17728)
    
    Relax the voter set change validation that exists in KRaft. When reading 
the kraft partition and validating voter set changes allow the voter set to 
have more than one change.
    
    This violates the invariant that after a voter change there are overlapping 
voters for all possible majorities. This is okay because the KRaft leader 
checks that there are no pending voter set updates when handling an add voter 
request and a remove voter request.
    
    Reviewers: José Armando García Sancio <jsan...@apache.org>
---
 .../internals/KRaftControlRecordStateMachine.java  |  2 +-
 .../kafka/raft/internals/VoterSetHistory.java      | 17 +++++---
 .../kafka/raft/internals/VoterSetHistoryTest.java  | 50 +++++++++++-----------
 3 files changed, 36 insertions(+), 33 deletions(-)

diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index c1d4a0b2f2d..e2b6f7e43e4 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -90,7 +90,7 @@ public final class KRaftControlRecordStateMachine {
         LogContext logContext
     ) {
         this.log = log;
-        this.voterSetHistory = new VoterSetHistory(staticVoterSet);
+        this.voterSetHistory = new VoterSetHistory(staticVoterSet, logContext);
         this.serde = serde;
         this.bufferSupplier = bufferSupplier;
         this.maxBatchSizeBytes = maxBatchSizeBytes;
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
index 6ab304f8c16..5e25c603542 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
@@ -16,8 +16,11 @@
  */
 package org.apache.kafka.raft.internals;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.raft.VoterSet;
 
+import org.slf4j.Logger;
+
 import java.util.Optional;
 import java.util.OptionalLong;
 
@@ -31,9 +34,11 @@ import java.util.OptionalLong;
 public final class VoterSetHistory {
     private final VoterSet staticVoterSet;
     private final LogHistory<VoterSet> votersHistory = new 
TreeMapLogHistory<>();
+    private final Logger logger;
 
-    VoterSetHistory(VoterSet staticVoterSet) {
+    VoterSetHistory(VoterSet staticVoterSet, LogContext logContext) {
         this.staticVoterSet = staticVoterSet;
+        this.logger = logContext.logger(getClass());
     }
 
     /**
@@ -55,12 +60,10 @@ public final class VoterSetHistory {
             // all replicas.
             VoterSet lastVoterSet = lastEntry.get().value();
             if (!lastVoterSet.hasOverlappingMajority(voters)) {
-                throw new IllegalArgumentException(
-                    String.format(
-                        "Last voter set %s doesn't have an overlapping 
majority with the new voter set %s",
-                        lastVoterSet,
-                        voters
-                    )
+                logger.info(
+                    "Last voter set ({}) doesn't have an overlapping majority 
with the new voter set ({})",
+                    lastVoterSet,
+                    voters
                 );
             }
         }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java
index 04f8aa8d365..302a1da5212 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.raft.internals;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.raft.VoterSet;
 import org.apache.kafka.raft.VoterSetTest;
 
@@ -33,7 +34,7 @@ public final class VoterSetHistoryTest {
     @Test
     void testStaticVoterSet() {
         VoterSet staticVoterSet = 
VoterSet.fromMap(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true));
-        VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
+        VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);
 
         assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0));
         assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100));
@@ -54,7 +55,7 @@ public final class VoterSetHistoryTest {
 
     @Test
     void TestNoStaticVoterSet() {
-        VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
+        VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());
 
         assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0));
         assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100));
@@ -65,7 +66,7 @@ public final class VoterSetHistoryTest {
     void testAddAt() {
         Map<Integer, VoterSet.VoterNode> voterMap = 
VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
         VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
-        VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
+        VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);
 
         assertThrows(
             IllegalArgumentException.class,
@@ -95,7 +96,7 @@ public final class VoterSetHistoryTest {
     void testBootstrapAddAt() {
         Map<Integer, VoterSet.VoterNode> voterMap = 
VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
         VoterSet bootstrapVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
-        VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
+        VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());
 
         votersHistory.addAt(-1, bootstrapVoterSet);
         assertEquals(bootstrapVoterSet, votersHistory.lastValue());
@@ -124,7 +125,7 @@ public final class VoterSetHistoryTest {
 
     @Test
     void testAddAtNonOverlapping() {
-        VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
+        VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());
 
         Map<Integer, VoterSet.VoterNode> voterMap = 
VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
         VoterSet voterSet = VoterSet.fromMap(new HashMap<>(voterMap));
@@ -132,35 +133,30 @@ public final class VoterSetHistoryTest {
         // Add a starting voter to the history
         votersHistory.addAt(100, voterSet);
 
-        // Remove voter so that it doesn't overlap
-        VoterSet nonoverlappingRemovedSet = voterSet
+        // Assert multiple voters can be removed at a time
+        VoterSet nonOverlappingRemovedSet = voterSet
             .removeVoter(voterMap.get(1).voterKey()).get()
             .removeVoter(voterMap.get(2).voterKey()).get();
 
-        assertThrows(
-            IllegalArgumentException.class,
-            () -> votersHistory.addAt(200, nonoverlappingRemovedSet)
-        );
-        assertEquals(voterSet, votersHistory.lastValue());
+        votersHistory.addAt(200, nonOverlappingRemovedSet);
 
+        assertEquals(nonOverlappingRemovedSet, votersHistory.lastValue());
 
-        // Add voters so that it doesn't overlap
-        VoterSet nonoverlappingAddSet = voterSet
-            .addVoter(VoterSetTest.voterNode(4, true)).get()
-            .addVoter(VoterSetTest.voterNode(5, true)).get();
+        // Assert multiple voters can be added at a time
+        VoterSet nonOverlappingAddSet = nonOverlappingRemovedSet
+            .addVoter(VoterSetTest.voterNode(1, true)).get()
+            .addVoter(VoterSetTest.voterNode(2, true)).get();
 
-        assertThrows(
-            IllegalArgumentException.class,
-            () -> votersHistory.addAt(200, nonoverlappingAddSet)
-        );
-        assertEquals(voterSet, votersHistory.lastValue());
+        votersHistory.addAt(300, nonOverlappingAddSet);
+
+        assertEquals(nonOverlappingAddSet, votersHistory.lastValue());
     }
 
     @Test
     void testNonoverlappingFromStaticVoterSet() {
         Map<Integer, VoterSet.VoterNode> voterMap = 
VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
         VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
-        VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
+        VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());
 
         // Remove voter so that it doesn't overlap
         VoterSet nonoverlappingRemovedSet = staticVoterSet
@@ -175,7 +171,7 @@ public final class VoterSetHistoryTest {
     void testTruncateTo() {
         Map<Integer, VoterSet.VoterNode> voterMap = 
VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
         VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
-        VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
+        VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);
 
         // Add voter 4 to the voter set and voter set history
         voterMap.put(4, VoterSetTest.voterNode(4, true));
@@ -201,7 +197,7 @@ public final class VoterSetHistoryTest {
     void testTrimPrefixTo() {
         Map<Integer, VoterSet.VoterNode> voterMap = 
VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
         VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
-        VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
+        VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);
 
         // Add voter 4 to the voter set and voter set history
         voterMap.put(4, VoterSetTest.voterNode(4, true));
@@ -234,7 +230,7 @@ public final class VoterSetHistoryTest {
     void testClear() {
         Map<Integer, VoterSet.VoterNode> voterMap = 
VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
         VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
-        VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
+        VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);
 
         // Add voter 4 to the voter set and voter set history
         voterMap.put(4, VoterSetTest.voterNode(4, true));
@@ -250,4 +246,8 @@ public final class VoterSetHistoryTest {
 
         assertEquals(staticVoterSet, votersHistory.lastValue());
     }
+
+    private VoterSetHistory voterSetHistory(VoterSet staticVoterSet) {
+        return new VoterSetHistory(staticVoterSet, new LogContext());
+    }
 }

Reply via email to