This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 43db8ac  KAFKA-12897: KRaft multi-partition placement on single broker 
(#10823)
43db8ac is described below

commit 43db8ac86ac3e609d9e2a993444b8f1b22f7693b
Author: Ron Dagostino <[email protected]>
AuthorDate: Mon Jun 7 17:13:06 2021 -0400

    KAFKA-12897: KRaft multi-partition placement on single broker (#10823)
    
    #10494 introduced a bug in the KRaft controller where the controller will 
loop forever in StripedReplicaPlacer trying to identify the racks on which to 
place partition replicas if there is a single unfenced broker in the cluster 
and the number of requested partitions in a CREATE_TOPICS request is greater 
than 1.
    
    This patch refactors out some argument sanity checks and invokes those 
checks in both RackList and StripedReplicaPlacer, and it adds tests for this as 
well as the single broker placement issue.
    
    Reviewers: Jun Rao <[email protected]>
---
 .../kafka/controller/StripedReplicaPlacer.java     | 42 +++++++++-----
 .../kafka/controller/StripedReplicaPlacerTest.java | 65 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 13 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java 
b/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
index a2aadc5..031354c 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
@@ -340,14 +340,14 @@ public class StripedReplicaPlacer implements 
ReplicaPlacer {
         }
 
         List<Integer> place(int replicationFactor) {
-            if (replicationFactor <= 0) {
-                throw new InvalidReplicationFactorException("Invalid 
replication factor " +
-                        replicationFactor + ": the replication factor must be 
positive.");
-            }
+            throwInvalidReplicationFactorIfNonPositive(replicationFactor);
+            throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor, 
numTotalBrokers());
+            throwInvalidReplicationFactorIfZero(numUnfencedBrokers());
             // If we have returned as many assignments as there are unfenced 
brokers in
             // the cluster, shuffle the rack list and broker lists to try to 
avoid
             // repeating the same assignments again.
-            if (epoch == numUnfencedBrokers) {
+            // But don't reset the iteration epoch for a single unfenced 
broker -- otherwise we would loop forever
+            if (epoch == numUnfencedBrokers && numUnfencedBrokers > 1) {
                 shuffle();
                 epoch = 0;
             }
@@ -400,6 +400,27 @@ public class StripedReplicaPlacer implements ReplicaPlacer 
{
         }
     }
 
+    private static void throwInvalidReplicationFactorIfNonPositive(int 
replicationFactor) {
+        if (replicationFactor <= 0) {
+            throw new InvalidReplicationFactorException("Invalid replication 
factor " +
+                    replicationFactor + ": the replication factor must be 
positive.");
+        }
+    }
+
+    private static void throwInvalidReplicationFactorIfZero(int numUnfenced) {
+        if (numUnfenced == 0) {
+            throw new InvalidReplicationFactorException("All brokers are 
currently fenced.");
+        }
+    }
+
+    private static void throwInvalidReplicationFactorIfTooFewBrokers(int 
replicationFactor, int numTotalBrokers) {
+        if (replicationFactor > numTotalBrokers) {
+            throw new InvalidReplicationFactorException("The target 
replication factor " +
+                    "of " + replicationFactor + " cannot be reached because 
only " +
+                    numTotalBrokers + " broker(s) are registered.");
+        }
+    }
+
     private final Random random;
 
     public StripedReplicaPlacer(Random random) {
@@ -412,14 +433,9 @@ public class StripedReplicaPlacer implements ReplicaPlacer 
{
                                      short replicationFactor,
                                      Iterator<UsableBroker> iterator) {
         RackList rackList = new RackList(random, iterator);
-        if (rackList.numUnfencedBrokers() == 0) {
-            throw new InvalidReplicationFactorException("All brokers are 
currently fenced.");
-        }
-        if (replicationFactor > rackList.numTotalBrokers()) {
-            throw new InvalidReplicationFactorException("The target 
replication factor " +
-                "of " + replicationFactor + " cannot be reached because only " 
+
-                rackList.numTotalBrokers() + " broker(s) are registered.");
-        }
+        throwInvalidReplicationFactorIfNonPositive(replicationFactor);
+        throwInvalidReplicationFactorIfZero(rackList.numUnfencedBrokers());
+        throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor, 
rackList.numTotalBrokers());
         List<List<Integer>> placements = new ArrayList<>(numPartitions);
         for (int partition = 0; partition < numPartitions; partition++) {
             placements.add(rackList.place(replicationFactor));
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
index 667e900..c3fbb09 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
@@ -85,6 +85,22 @@ public class StripedReplicaPlacerTest {
     }
 
     /**
+     * Test that we perform striped replica placement as expected for a 
multi-partition topic
+     * on a single unfenced broker
+     */
+    @Test
+    public void testMultiPartitionTopicPlacementOnSingleUnfencedBroker() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals(Arrays.asList(Arrays.asList(0),
+                Arrays.asList(0),
+                Arrays.asList(0)),
+                placer.place(0, 3, (short) 1, Arrays.asList(
+                        new UsableBroker(0, Optional.empty(), false),
+                        new UsableBroker(1, Optional.empty(), 
true)).iterator()));
+    }
+
+    /**
      * Test that we will place on the fenced replica if we need to.
      */
     @Test
@@ -168,6 +184,17 @@ public class StripedReplicaPlacerTest {
     }
 
     @Test
+    public void testNonPositiveReplicationFactor() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals("Invalid replication factor 0: the replication factor 
must be positive.",
+                assertThrows(InvalidReplicationFactorException.class,
+                        () -> placer.place(0, 1, (short) 0, Arrays.asList(
+                                new UsableBroker(11, Optional.of("1"), false),
+                                new UsableBroker(10, Optional.of("1"), 
false)).iterator())).getMessage());
+    }
+
+    @Test
     public void testSuccessfulPlacement() {
         MockRandom random = new MockRandom();
         StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
@@ -210,4 +237,42 @@ public class StripedReplicaPlacerTest {
         assertEquals(11, counts.get(Arrays.asList(3, 2)));
     }
 
+    @Test
+    public void testRackListAllBrokersFenced() {
+        // ensure we can place N replicas on a rack when the rack has less 
than N brokers
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+                new UsableBroker(0, Optional.empty(), true),
+                new UsableBroker(1, Optional.empty(), true),
+                new UsableBroker(2, Optional.empty(), true)).iterator());
+        assertEquals(3, rackList.numTotalBrokers());
+        assertEquals(0, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+        assertEquals("All brokers are currently fenced.",
+                assertThrows(InvalidReplicationFactorException.class,
+                        () -> rackList.place(3)).getMessage());
+    }
+
+    @Test
+    public void testRackListNotEnoughBrokers() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+                new UsableBroker(11, Optional.of("1"), false),
+                new UsableBroker(10, Optional.of("1"), false)).iterator());
+        assertEquals("The target replication factor of 3 cannot be reached 
because only " +
+                        "2 broker(s) are registered.",
+                assertThrows(InvalidReplicationFactorException.class,
+                        () -> rackList.place(3)).getMessage());
+    }
+
+    @Test
+    public void testRackListNonPositiveReplicationFactor() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+                new UsableBroker(11, Optional.of("1"), false),
+                new UsableBroker(10, Optional.of("1"), false)).iterator());
+        assertEquals("Invalid replication factor -1: the replication factor 
must be positive.",
+                assertThrows(InvalidReplicationFactorException.class,
+                        () -> rackList.place(-1)).getMessage());
+    }
 }

Reply via email to