This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new bcb5d167fd1 [KAFKA-17870] Fail CreateTopicsRequest if total number of
partitions exceeds 10k (#17604)
bcb5d167fd1 is described below
commit bcb5d167fd196c8ad9cfc179eb76d02eecb7dd1d
Author: Jonah Hooper <[email protected]>
AuthorDate: Thu Oct 31 16:54:03 2024 -0400
[KAFKA-17870] Fail CreateTopicsRequest if total number of partitions
exceeds 10k (#17604)
We fail the entire CreateTopicsRequest action if there are more than 10k
total
partitions being created in this topic for this specific request. The usual
pattern for
this API to try and succeed with some topics. Since the 10k limit applies
to all topics
then no topic should be created if they all exceede it.
Reviewers: Colin P. McCabe <[email protected]>
---
.../kafka/server/KRaftClusterTest.scala | 2 +-
.../controller/ReplicationControlManager.java | 31 ++++++++++++++++++
.../controller/ReplicationControlManagerTest.java | 38 ++++++++++++++++++++++
3 files changed, 70 insertions(+), 1 deletion(-)
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 7e74ced9a02..18402e54083 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1289,7 +1289,7 @@ class KRaftClusterTest {
() => admin.createTopics(newTopics).all().get())
assertNotNull(executionException.getCause)
assertEquals(classOf[PolicyViolationException],
executionException.getCause.getClass)
- assertEquals("Unable to perform excessively large batch operation.",
+ assertEquals("Excessively large number of partitions per request.",
executionException.getCause.getMessage)
} finally {
admin.close()
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index e7a4359fb54..69b9ac7bde0 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -152,6 +152,7 @@ import static
org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
*/
public class ReplicationControlManager {
static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+ static final int MAX_PARTITIONS_PER_BATCH = 10_000;
static class Builder {
private SnapshotRegistry snapshotRegistry = null;
@@ -592,6 +593,8 @@ public class ReplicationControlManager {
Map<String, ApiError> topicErrors = new HashMap<>();
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
+ validateTotalNumberOfPartitions(request, defaultNumPartitions);
+
// Check the topic names.
validateNewTopicNames(topicErrors, request.topics(),
topicsWithCollisionChars);
@@ -1137,6 +1140,34 @@ public class ReplicationControlManager {
return ControllerResult.of(records, response);
}
+ /**
+ * Validates that a batch of topics will create less than {@value
MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch
+ * has led to out-of-memory exceptions. We use this validation to fail
earlier to avoid allocating the memory.
+ * Validates an upper bound number of partitions. The actual number may be
smaller if some topics are misconfigured.
+ *
+ * @param request a batch of topics to create.
+ * @param defaultNumPartitions default number of partitions to assign if
unspecified.
+ * @throws PolicyViolationException if total number of partitions exceeds
{@value MAX_PARTITIONS_PER_BATCH}.
+ */
+ static void validateTotalNumberOfPartitions(CreateTopicsRequestData
request, int defaultNumPartitions) {
+ int totalPartitions = 0;
+ for (CreatableTopic topic: request.topics()) {
+ if (topic.assignments().isEmpty()) {
+ if (topic.numPartitions() == -1) {
+ totalPartitions += defaultNumPartitions;
+ } else if (topic.numPartitions() > 0) {
+ totalPartitions += topic.numPartitions();
+ }
+ } else {
+ totalPartitions += topic.assignments().size();
+ }
+
+ }
+ if (totalPartitions > MAX_PARTITIONS_PER_BATCH) {
+ throw new PolicyViolationException("Excessively large number of
partitions per request.");
+ }
+ }
+
/**
* Validate the partition information included in the alter partition
request.
*
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index b50d32e16bc..9a335a09353 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -573,6 +573,44 @@ public class ReplicationControlManagerTest {
}
}
+ @Test
+ public void testExcessiveNumberOfTopicsCannotBeCreated() {
+ // number of partitions is explicitly set without assignments
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder().build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(5000).setReplicationFactor((short) 1));
+ request.topics().add(new CreatableTopic().setName("bar").
+ setNumPartitions(5000).setReplicationFactor((short) 1));
+ request.topics().add(new CreatableTopic().setName("baz").
+ setNumPartitions(1).setReplicationFactor((short) 1));
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ PolicyViolationException error = assertThrows(
+ PolicyViolationException.class,
+ () -> replicationControl.createTopics(requestContext, request,
Set.of("foo", "bar", "baz")));
+ assertEquals(error.getMessage(), "Excessively large number of
partitions per request.");
+ }
+
+ @Test
+ public void testExcessiveNumberOfTopicsCannotBeCreatedWithAssignments() {
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(-1).setReplicationFactor((short) 1));
+ CreateTopicsRequestData.CreatableReplicaAssignmentCollection
assignments =
+ new
CreateTopicsRequestData.CreatableReplicaAssignmentCollection();
+ assignments.add(new CreatableReplicaAssignment().setPartitionIndex(1));
+ assignments.add(new CreatableReplicaAssignment().setPartitionIndex(2));
+ request.topics().add(new CreatableTopic()
+ .setName("baz")
+ .setAssignments(assignments));
+ PolicyViolationException error = assertThrows(
+ PolicyViolationException.class,
+ () ->
ReplicationControlManager.validateTotalNumberOfPartitions(request, 9999)
+ );
+ assertEquals(error.getMessage(), "Excessively large number of
partitions per request.");
+ }
+
@Test
public void testCreateTopics() {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder().build();