This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 590df2c KAFKA-13316; Enable KRaft mode in CreateTopics tests (#11655)
590df2c is described below
commit 590df2c8befa912366925ab9b529c9f9c4bced70
Author: dengziming <[email protected]>
AuthorDate: Fri Feb 11 06:10:23 2022 +0800
KAFKA-13316; Enable KRaft mode in CreateTopics tests (#11655)
This PR follows #11629 to enable `CreateTopicsRequestWithForwardingTest`
and `CreateTopicsRequestWithPolicyTest` in KRaft mode.
Reviewers: Jason Gustafson <[email protected]>
---
.../server/AbstractCreateTopicsRequestTest.scala | 6 +--
.../CreateTopicsRequestWithForwardingTest.scala | 8 ++--
.../server/CreateTopicsRequestWithPolicyTest.scala | 48 +++++++++++++++++-----
3 files changed, 46 insertions(+), 16 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 661086c..91ff1d5 100644
---
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertNotNull, assertTrue}
import scala.jdk.CollectionConverters._
@@ -122,8 +122,8 @@ abstract class AbstractCreateTopicsRequestTest extends
BaseRequestTest {
topic.replicationFactor
if (request.data.validateOnly) {
- assertNotNull(metadataForTopic, s"Topic $topic should be created")
- assertFalse(metadataForTopic.error == Errors.NONE, s"Error
${metadataForTopic.error} for topic $topic")
+ assertNotNull(metadataForTopic)
+ assertNotEquals(Errors.NONE, metadataForTopic.error, s"Topic $topic
should not be created")
assertTrue(metadataForTopic.partitionMetadata.isEmpty, "The topic
should have no partitions")
}
else {
diff --git
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
index f55aa34..3eafcc4 100644
---
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
+++
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
@@ -19,7 +19,8 @@ package kafka.server
import org.apache.kafka.common.protocol.Errors
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
@@ -27,8 +28,9 @@ class CreateTopicsRequestWithForwardingTest extends
AbstractCreateTopicsRequestT
override def enableForwarding: Boolean = true
- @Test
- def testForwardToController(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testForwardToController(quorum: String): Unit = {
val req = topicsReq(Seq(topicReq("topic1")))
val response = sendCreateTopicRequest(req, notControllerSocketServer)
// With forwarding enabled, request could be forwarded to the active
controller.
diff --git
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index b5456c6..d480c7b 100644
---
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -19,13 +19,14 @@ package kafka.server
import java.util
import java.util.Properties
-
import kafka.log.LogConfig
import org.apache.kafka.common.errors.PolicyViolationException
+import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.policy.CreateTopicPolicy
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
@@ -37,8 +38,15 @@ class CreateTopicsRequestWithPolicyTest extends
AbstractCreateTopicsRequestTest
properties.put(KafkaConfig.CreateTopicPolicyClassNameProp,
classOf[Policy].getName)
}
- @Test
- def testValidCreateTopicsRequests(): Unit = {
+ override def kraftControllerConfigs(): Seq[Properties] = {
+ val properties = new Properties()
+ properties.put(KafkaConfig.CreateTopicPolicyClassNameProp,
classOf[Policy].getName)
+ Seq(properties)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testValidCreateTopicsRequests(quorum: String): Unit = {
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1",
numPartitions = 5))))
@@ -55,10 +63,11 @@ class CreateTopicsRequestWithPolicyTest extends
AbstractCreateTopicsRequestTest
assignment = Map(0 -> List(1, 0), 1 -> List(0, 1))))))
}
- @Test
- def testErrorCreateTopicsRequests(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testErrorCreateTopicsRequests(quorum: String): Unit = {
val existingTopic = "existing-topic"
- createTopic(existingTopic, 1, 1)
+ createTopic(existingTopic, 5, 1)
// Policy violations
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic1",
@@ -93,20 +102,35 @@ class CreateTopicsRequestWithPolicyTest extends
AbstractCreateTopicsRequestTest
Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS,
Some("Topic 'existing-topic' already exists."))))
+ var errorMsg = if (isKRaftTest()) {
+ "Unable to replicate the partition 4 time(s): The target replication
factor of 4 cannot be reached because only 3 broker(s) are registered."
+ } else {
+ "Replication factor: 4 larger than available brokers: 3."
+ }
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
numPartitions = 10, replicationFactor = brokerCount + 1)), validateOnly
= true),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
- Some("Replication factor: 4 larger than available brokers: 3."))))
+ Some(errorMsg))))
+ errorMsg = if (isKRaftTest()) {
+ "Replication factor must be larger than 0, or -1 to use the default
value."
+ } else {
+ "Replication factor must be larger than 0."
+ }
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication2",
numPartitions = 10, replicationFactor = -2)), validateOnly = true),
Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR,
- Some("Replication factor must be larger than 0."))))
+ Some(errorMsg))))
+ errorMsg = if (isKRaftTest()) {
+ "Number of partitions was set to an invalid non-positive value."
+ } else {
+ "Number of partitions must be larger than 0."
+ }
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions",
numPartitions = -2, replicationFactor = 1)), validateOnly = true),
Map("error-partitions" -> error(Errors.INVALID_PARTITIONS,
- Some("Number of partitions must be larger than 0."))))
+ Some(errorMsg))))
}
}
@@ -123,6 +147,10 @@ object CreateTopicsRequestWithPolicyTest {
}
def validate(requestMetadata: RequestMetadata): Unit = {
+ if (Topic.isInternal(requestMetadata.topic())) {
+ // Do not verify internal topics
+ return
+ }
require(!closed, "Policy should not be closed")
require(!configs.isEmpty, "configure should have been called with non
empty configs")