This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 43db8ac KAFKA-12897: KRaft multi-partition placement on single broker
(#10823)
43db8ac is described below
commit 43db8ac86ac3e609d9e2a993444b8f1b22f7693b
Author: Ron Dagostino <[email protected]>
AuthorDate: Mon Jun 7 17:13:06 2021 -0400
KAFKA-12897: KRaft multi-partition placement on single broker (#10823)
#10494 introduced a bug in the KRaft controller where the controller will
loop forever in StripedReplicaPlacer trying to identify the racks on which to
place partition replicas if there is a single unfenced broker in the cluster
and the number of requested partitions in a CREATE_TOPICS request is greater
than 1.
This patch refactors out some argument sanity checks and invokes those
checks in both RackList and StripedReplicaPlacer, and it adds tests for this as
well as the single broker placement issue.
Reviewers: Jun Rao <[email protected]>
---
.../kafka/controller/StripedReplicaPlacer.java | 42 +++++++++-----
.../kafka/controller/StripedReplicaPlacerTest.java | 65 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 13 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
b/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
index a2aadc5..031354c 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
@@ -340,14 +340,14 @@ public class StripedReplicaPlacer implements
ReplicaPlacer {
}
List<Integer> place(int replicationFactor) {
- if (replicationFactor <= 0) {
- throw new InvalidReplicationFactorException("Invalid
replication factor " +
- replicationFactor + ": the replication factor must be
positive.");
- }
+ throwInvalidReplicationFactorIfNonPositive(replicationFactor);
+ throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor,
numTotalBrokers());
+ throwInvalidReplicationFactorIfZero(numUnfencedBrokers());
// If we have returned as many assignments as there are unfenced
brokers in
// the cluster, shuffle the rack list and broker lists to try to
avoid
// repeating the same assignments again.
- if (epoch == numUnfencedBrokers) {
+ // But don't reset the iteration epoch for a single unfenced
broker -- otherwise we would loop forever
+ if (epoch == numUnfencedBrokers && numUnfencedBrokers > 1) {
shuffle();
epoch = 0;
}
@@ -400,6 +400,27 @@ public class StripedReplicaPlacer implements ReplicaPlacer
{
}
}
+ private static void throwInvalidReplicationFactorIfNonPositive(int
replicationFactor) {
+ if (replicationFactor <= 0) {
+ throw new InvalidReplicationFactorException("Invalid replication
factor " +
+ replicationFactor + ": the replication factor must be
positive.");
+ }
+ }
+
+ private static void throwInvalidReplicationFactorIfZero(int numUnfenced) {
+ if (numUnfenced == 0) {
+ throw new InvalidReplicationFactorException("All brokers are
currently fenced.");
+ }
+ }
+
+ private static void throwInvalidReplicationFactorIfTooFewBrokers(int
replicationFactor, int numTotalBrokers) {
+ if (replicationFactor > numTotalBrokers) {
+ throw new InvalidReplicationFactorException("The target
replication factor " +
+ "of " + replicationFactor + " cannot be reached because
only " +
+ numTotalBrokers + " broker(s) are registered.");
+ }
+ }
+
private final Random random;
public StripedReplicaPlacer(Random random) {
@@ -412,14 +433,9 @@ public class StripedReplicaPlacer implements ReplicaPlacer
{
short replicationFactor,
Iterator<UsableBroker> iterator) {
RackList rackList = new RackList(random, iterator);
- if (rackList.numUnfencedBrokers() == 0) {
- throw new InvalidReplicationFactorException("All brokers are
currently fenced.");
- }
- if (replicationFactor > rackList.numTotalBrokers()) {
- throw new InvalidReplicationFactorException("The target
replication factor " +
- "of " + replicationFactor + " cannot be reached because only "
+
- rackList.numTotalBrokers() + " broker(s) are registered.");
- }
+ throwInvalidReplicationFactorIfNonPositive(replicationFactor);
+ throwInvalidReplicationFactorIfZero(rackList.numUnfencedBrokers());
+ throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor,
rackList.numTotalBrokers());
List<List<Integer>> placements = new ArrayList<>(numPartitions);
for (int partition = 0; partition < numPartitions; partition++) {
placements.add(rackList.place(replicationFactor));
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
index 667e900..c3fbb09 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
@@ -85,6 +85,22 @@ public class StripedReplicaPlacerTest {
}
/**
+ * Test that we perform striped replica placement as expected for a
multi-partition topic
+ * on a single unfenced broker
+ */
+ @Test
+ public void testMultiPartitionTopicPlacementOnSingleUnfencedBroker() {
+ MockRandom random = new MockRandom();
+ StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+ assertEquals(Arrays.asList(Arrays.asList(0),
+ Arrays.asList(0),
+ Arrays.asList(0)),
+ placer.place(0, 3, (short) 1, Arrays.asList(
+ new UsableBroker(0, Optional.empty(), false),
+ new UsableBroker(1, Optional.empty(),
true)).iterator()));
+ }
+
+ /**
* Test that we will place on the fenced replica if we need to.
*/
@Test
@@ -168,6 +184,17 @@ public class StripedReplicaPlacerTest {
}
@Test
+ public void testNonPositiveReplicationFactor() {
+ MockRandom random = new MockRandom();
+ StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+ assertEquals("Invalid replication factor 0: the replication factor
must be positive.",
+ assertThrows(InvalidReplicationFactorException.class,
+ () -> placer.place(0, 1, (short) 0, Arrays.asList(
+ new UsableBroker(11, Optional.of("1"), false),
+ new UsableBroker(10, Optional.of("1"),
false)).iterator())).getMessage());
+ }
+
+ @Test
public void testSuccessfulPlacement() {
MockRandom random = new MockRandom();
StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
@@ -210,4 +237,42 @@ public class StripedReplicaPlacerTest {
assertEquals(11, counts.get(Arrays.asList(3, 2)));
}
+ @Test
+ public void testRackListAllBrokersFenced() {
+ // ensure we can place N replicas on a rack when the rack has less
than N brokers
+ MockRandom random = new MockRandom();
+ RackList rackList = new RackList(random, Arrays.asList(
+ new UsableBroker(0, Optional.empty(), true),
+ new UsableBroker(1, Optional.empty(), true),
+ new UsableBroker(2, Optional.empty(), true)).iterator());
+ assertEquals(3, rackList.numTotalBrokers());
+ assertEquals(0, rackList.numUnfencedBrokers());
+ assertEquals(Collections.singletonList(Optional.empty()),
rackList.rackNames());
+ assertEquals("All brokers are currently fenced.",
+ assertThrows(InvalidReplicationFactorException.class,
+ () -> rackList.place(3)).getMessage());
+ }
+
+ @Test
+ public void testRackListNotEnoughBrokers() {
+ MockRandom random = new MockRandom();
+ RackList rackList = new RackList(random, Arrays.asList(
+ new UsableBroker(11, Optional.of("1"), false),
+ new UsableBroker(10, Optional.of("1"), false)).iterator());
+ assertEquals("The target replication factor of 3 cannot be reached
because only " +
+ "2 broker(s) are registered.",
+ assertThrows(InvalidReplicationFactorException.class,
+ () -> rackList.place(3)).getMessage());
+ }
+
+ @Test
+ public void testRackListNonPositiveReplicationFactor() {
+ MockRandom random = new MockRandom();
+ RackList rackList = new RackList(random, Arrays.asList(
+ new UsableBroker(11, Optional.of("1"), false),
+ new UsableBroker(10, Optional.of("1"), false)).iterator());
+ assertEquals("Invalid replication factor -1: the replication factor
must be positive.",
+ assertThrows(InvalidReplicationFactorException.class,
+ () -> rackList.place(-1)).getMessage());
+ }
}