This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 590df2c  KAFKA-13316; Enable KRaft mode in CreateTopics tests (#11655)
590df2c is described below

commit 590df2c8befa912366925ab9b529c9f9c4bced70
Author: dengziming <[email protected]>
AuthorDate: Fri Feb 11 06:10:23 2022 +0800

    KAFKA-13316; Enable KRaft mode in CreateTopics tests (#11655)
    
    This PR follows #11629 to enable `CreateTopicsRequestWithForwardingTest` 
and `CreateTopicsRequestWithPolicyTest` in KRaft mode.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../server/AbstractCreateTopicsRequestTest.scala   |  6 +--
 .../CreateTopicsRequestWithForwardingTest.scala    |  8 ++--
 .../server/CreateTopicsRequestWithPolicyTest.scala | 48 +++++++++++++++++-----
 3 files changed, 46 insertions(+), 16 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 661086c..91ff1d5 100644
--- 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests._
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNotNull, assertTrue}
 
 import scala.jdk.CollectionConverters._
 
@@ -122,8 +122,8 @@ abstract class AbstractCreateTopicsRequestTest extends 
BaseRequestTest {
           topic.replicationFactor
 
         if (request.data.validateOnly) {
-          assertNotNull(metadataForTopic, s"Topic $topic should be created")
-          assertFalse(metadataForTopic.error == Errors.NONE, s"Error 
${metadataForTopic.error} for topic $topic")
+          assertNotNull(metadataForTopic)
+          assertNotEquals(Errors.NONE, metadataForTopic.error, s"Topic $topic 
should not be created")
           assertTrue(metadataForTopic.partitionMetadata.isEmpty, "The topic 
should have no partitions")
         }
         else {
diff --git 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
index f55aa34..3eafcc4 100644
--- 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
@@ -19,7 +19,8 @@ package kafka.server
 
 import org.apache.kafka.common.protocol.Errors
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
@@ -27,8 +28,9 @@ class CreateTopicsRequestWithForwardingTest extends 
AbstractCreateTopicsRequestT
 
   override def enableForwarding: Boolean = true
 
-  @Test
-  def testForwardToController(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testForwardToController(quorum: String): Unit = {
     val req = topicsReq(Seq(topicReq("topic1")))
     val response = sendCreateTopicRequest(req, notControllerSocketServer)
     // With forwarding enabled, request could be forwarded to the active 
controller.
diff --git 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index b5456c6..d480c7b 100644
--- 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -19,13 +19,14 @@ package kafka.server
 
 import java.util
 import java.util.Properties
-
 import kafka.log.LogConfig
 import org.apache.kafka.common.errors.PolicyViolationException
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.server.policy.CreateTopicPolicy
 import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
@@ -37,8 +38,15 @@ class CreateTopicsRequestWithPolicyTest extends 
AbstractCreateTopicsRequestTest
     properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, 
classOf[Policy].getName)
   }
 
-  @Test
-  def testValidCreateTopicsRequests(): Unit = {
+  override def kraftControllerConfigs(): Seq[Properties] = {
+    val properties = new Properties()
+    properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, 
classOf[Policy].getName)
+    Seq(properties)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testValidCreateTopicsRequests(quorum: String): Unit = {
     validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1",
       numPartitions = 5))))
 
@@ -55,10 +63,11 @@ class CreateTopicsRequestWithPolicyTest extends 
AbstractCreateTopicsRequestTest
       assignment = Map(0 -> List(1, 0), 1 -> List(0, 1))))))
   }
 
-  @Test
-  def testErrorCreateTopicsRequests(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testErrorCreateTopicsRequests(quorum: String): Unit = {
     val existingTopic = "existing-topic"
-    createTopic(existingTopic, 1, 1)
+    createTopic(existingTopic, 5, 1)
 
     // Policy violations
     validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic1",
@@ -93,20 +102,35 @@ class CreateTopicsRequestWithPolicyTest extends 
AbstractCreateTopicsRequestTest
       Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS,
         Some("Topic 'existing-topic' already exists."))))
 
+    var errorMsg = if (isKRaftTest()) {
+      "Unable to replicate the partition 4 time(s): The target replication 
factor of 4 cannot be reached because only 3 broker(s) are registered."
+    } else {
+      "Replication factor: 4 larger than available brokers: 3."
+    }
     
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
       numPartitions = 10, replicationFactor = brokerCount + 1)), validateOnly 
= true),
       Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
-        Some("Replication factor: 4 larger than available brokers: 3."))))
+        Some(errorMsg))))
 
+    errorMsg = if (isKRaftTest()) {
+      "Replication factor must be larger than 0, or -1 to use the default 
value."
+    } else {
+      "Replication factor must be larger than 0."
+    }
     
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication2",
       numPartitions = 10, replicationFactor = -2)), validateOnly = true),
       Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR,
-        Some("Replication factor must be larger than 0."))))
+        Some(errorMsg))))
 
+    errorMsg = if (isKRaftTest()) {
+      "Number of partitions was set to an invalid non-positive value."
+    } else {
+      "Number of partitions must be larger than 0."
+    }
     
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions",
       numPartitions = -2, replicationFactor = 1)), validateOnly = true),
       Map("error-partitions" -> error(Errors.INVALID_PARTITIONS,
-        Some("Number of partitions must be larger than 0."))))
+        Some(errorMsg))))
   }
 
 }
@@ -123,6 +147,10 @@ object CreateTopicsRequestWithPolicyTest {
     }
 
     def validate(requestMetadata: RequestMetadata): Unit = {
+      if (Topic.isInternal(requestMetadata.topic())) {
+        // Do not verify internal topics
+        return
+      }
       require(!closed, "Policy should not be closed")
       require(!configs.isEmpty, "configure should have been called with non 
empty configs")
 

Reply via email to