This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 18b8b992f9b [KAFKA-17870] Fail CreateTopicsRequest if total number of
partitions exceeds 10k (#17604)
18b8b992f9b is described below
commit 18b8b992f9baa91d9785d491f7cce2867ecb6ba9
Author: Jonah Hooper <[email protected]>
AuthorDate: Thu Oct 31 16:54:03 2024 -0400
[KAFKA-17870] Fail CreateTopicsRequest if total number of partitions
exceeds 10k (#17604)
We fail the entire CreateTopicsRequest action if there are more than 10k
total
partitions being created in this topic for this specific request. The usual
pattern for
this API to try and succeed with some topics. Since the 10k limit applies
to all topics
then no topic should be created if they all exceede it.
Reviewers: Colin P. McCabe <[email protected]>
---
.../kafka/server/KRaftClusterTest.scala | 2 +-
.../controller/ReplicationControlManager.java | 31 ++++++++++++++++++
.../controller/ReplicationControlManagerTest.java | 38 ++++++++++++++++++++++
3 files changed, 70 insertions(+), 1 deletion(-)
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 45434a2bf32..cbc502801cc 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1289,7 +1289,7 @@ class KRaftClusterTest {
() => admin.createTopics(newTopics).all().get())
assertNotNull(executionException.getCause)
assertEquals(classOf[PolicyViolationException],
executionException.getCause.getClass)
- assertEquals("Unable to perform excessively large batch operation.",
+ assertEquals("Excessively large number of partitions per request.",
executionException.getCause.getMessage)
} finally {
admin.close()
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index da53ec4594b..491f4620304 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -152,6 +152,7 @@ import static
org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
*/
public class ReplicationControlManager {
static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+ static final int MAX_PARTITIONS_PER_BATCH = 10_000;
static class Builder {
private SnapshotRegistry snapshotRegistry = null;
@@ -592,6 +593,8 @@ public class ReplicationControlManager {
Map<String, ApiError> topicErrors = new HashMap<>();
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
+ validateTotalNumberOfPartitions(request, defaultNumPartitions);
+
// Check the topic names.
validateNewTopicNames(topicErrors, request.topics(),
topicsWithCollisionChars);
@@ -1136,6 +1139,34 @@ public class ReplicationControlManager {
return ControllerResult.of(records, response);
}
+ /**
+ * Validates that a batch of topics will create less than {@value
MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch
+ * has led to out-of-memory exceptions. We use this validation to fail
earlier to avoid allocating the memory.
+ * Validates an upper bound number of partitions. The actual number may be
smaller if some topics are misconfigured.
+ *
+ * @param request a batch of topics to create.
+ * @param defaultNumPartitions default number of partitions to assign if
unspecified.
+ * @throws PolicyViolationException if total number of partitions exceeds
{@value MAX_PARTITIONS_PER_BATCH}.
+ */
+ static void validateTotalNumberOfPartitions(CreateTopicsRequestData
request, int defaultNumPartitions) {
+ int totalPartitions = 0;
+ for (CreatableTopic topic: request.topics()) {
+ if (topic.assignments().isEmpty()) {
+ if (topic.numPartitions() == -1) {
+ totalPartitions += defaultNumPartitions;
+ } else if (topic.numPartitions() > 0) {
+ totalPartitions += topic.numPartitions();
+ }
+ } else {
+ totalPartitions += topic.assignments().size();
+ }
+
+ }
+ if (totalPartitions > MAX_PARTITIONS_PER_BATCH) {
+ throw new PolicyViolationException("Excessively large number of
partitions per request.");
+ }
+ }
+
/**
* Validate the partition information included in the alter partition
request.
*
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index e4f045b1fa2..194c295de28 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -565,6 +565,44 @@ public class ReplicationControlManagerTest {
}
}
+ @Test
+ public void testExcessiveNumberOfTopicsCannotBeCreated() {
+ // number of partitions is explicitly set without assignments
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder().build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(5000).setReplicationFactor((short) 1));
+ request.topics().add(new CreatableTopic().setName("bar").
+ setNumPartitions(5000).setReplicationFactor((short) 1));
+ request.topics().add(new CreatableTopic().setName("baz").
+ setNumPartitions(1).setReplicationFactor((short) 1));
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ PolicyViolationException error = assertThrows(
+ PolicyViolationException.class,
+ () -> replicationControl.createTopics(requestContext, request,
Set.of("foo", "bar", "baz")));
+ assertEquals(error.getMessage(), "Excessively large number of
partitions per request.");
+ }
+
+ @Test
+ public void testExcessiveNumberOfTopicsCannotBeCreatedWithAssignments() {
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(-1).setReplicationFactor((short) 1));
+ CreateTopicsRequestData.CreatableReplicaAssignmentCollection
assignments =
+ new
CreateTopicsRequestData.CreatableReplicaAssignmentCollection();
+ assignments.add(new CreatableReplicaAssignment().setPartitionIndex(1));
+ assignments.add(new CreatableReplicaAssignment().setPartitionIndex(2));
+ request.topics().add(new CreatableTopic()
+ .setName("baz")
+ .setAssignments(assignments));
+ PolicyViolationException error = assertThrows(
+ PolicyViolationException.class,
+ () ->
ReplicationControlManager.validateTotalNumberOfPartitions(request, 9999)
+ );
+ assertEquals(error.getMessage(), "Excessively large number of
partitions per request.");
+ }
+
@Test
public void testCreateTopics() {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder().build();