This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7530ac6  MINOR: enable KRaft in MetadataRequestTest (#11637)
7530ac6 is described below

commit 7530ac65834d98ae4f30fb32c50795b17f01b356
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Jan 5 21:53:54 2022 -0800

    MINOR: enable KRaft in MetadataRequestTest (#11637)
    
    Reviewers: David Arthur <[email protected]>
---
 .../kafka/api/IntegrationTestHarness.scala         |  14 ++
 .../kafka/server/QuorumTestHarness.scala           |   5 +-
 .../kafka/integration/KafkaServerTestHarness.scala |  15 +-
 .../kafka/server/AbstractMetadataRequestTest.scala |  16 +-
 .../scala/unit/kafka/server/BaseRequestTest.scala  |  28 ++--
 .../unit/kafka/server/MetadataRequestTest.scala    | 167 +++++++++++++--------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  37 ++++-
 7 files changed, 190 insertions(+), 92 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 0f987e9..bc4166c 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -62,6 +62,7 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, 
logDirCount = logDirCount)
     configureListeners(cfgs)
     modifyConfigs(cfgs)
+    insertControllerListenersIfNeeded(cfgs)
     cfgs.map(KafkaConfig.fromProps)
   }
 
@@ -80,6 +81,19 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     }
   }
 
+  private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit 
= {
+    if (isKRaftTest()) {
+      props.foreach { config =>
+        // Add a security protocol for the CONTROLLER endpoint, if one is not 
already set.
+        val securityPairs = 
config.getProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "").split(",")
+        if (!securityPairs.exists(_.startsWith("CONTROLLER:"))) {
+          config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
+            (securityPairs ++ 
Seq(s"CONTROLLER:${controllerListenerSecurityProtocol.toString}")).mkString(","))
+        }
+      }
+    }
+  }
+
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     doSetup(testInfo, createOffsetsTopic = true)
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 5e679a7..ab2cb34 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -103,9 +103,10 @@ abstract class QuorumTestHarness extends Logging {
   protected def zkAclsEnabled: Option[Boolean] = None
 
   /**
-   * When in KRaft mode, this test harness only support PLAINTEXT
+   * When in KRaft mode, the security protocol to use for the controller 
listener.
+   * Can be overridden by subclasses.
    */
-  private val controllerListenerSecurityProtocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT
+  protected val controllerListenerSecurityProtocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT
 
   protected def kraftControllerConfigs(): Seq[Properties] = {
     Seq(new Properties())
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 9bd9f57..acb1201 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -153,7 +153,12 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
                   topicConfig: Properties = new Properties,
                   adminClientConfig: Properties = new Properties): 
scala.collection.immutable.Map[Int, Int] = {
     if (isKRaftTest()) {
-      TestUtils.createTopicWithAdmin(topic, numPartitions, replicationFactor, 
brokers, topicConfig, adminClientConfig)
+      TestUtils.createTopicWithAdmin(topic = topic,
+        brokers = brokers,
+        numPartitions = numPartitions,
+        replicationFactor = replicationFactor,
+        topicConfig = topicConfig,
+        adminConfig = adminClientConfig)
     } else {
       TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, 
servers, topicConfig)
     }
@@ -165,7 +170,13 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
    * Return the leader for each partition.
    */
   def createTopic(topic: String, partitionReplicaAssignment: 
collection.Map[Int, Seq[Int]]): scala.collection.immutable.Map[Int, Int] =
-    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment, servers)
+    if (isKRaftTest()) {
+      TestUtils.createTopicWithAdmin(topic = topic,
+        replicaAssignment = partitionReplicaAssignment,
+        brokers = brokers)
+    } else {
+      TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment, 
servers)
+    }
 
   def deleteTopic(topic: String): Unit = {
     if (isKRaftTest()) {
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
index b140851..309bfda 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
@@ -53,9 +53,17 @@ abstract class AbstractMetadataRequestTest extends 
BaseRequestTest {
   }
 
   protected def checkAutoCreatedTopic(autoCreatedTopic: String, response: 
MetadataResponse): Unit = {
-    assertEquals(Errors.LEADER_NOT_AVAILABLE, 
response.errors.get(autoCreatedTopic))
-    assertEquals(Some(servers.head.config.numPartitions), 
zkClient.getTopicPartitionCount(autoCreatedTopic))
-    for (i <- 0 until servers.head.config.numPartitions)
-      TestUtils.waitForPartitionMetadata(servers, autoCreatedTopic, i)
+    if (isKRaftTest()) {
+      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response.errors.get(autoCreatedTopic))
+      for (i <- 0 until brokers.head.config.numPartitions) {
+        TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
+      }
+    } else {
+      assertEquals(Errors.LEADER_NOT_AVAILABLE, 
response.errors.get(autoCreatedTopic))
+      assertEquals(Some(brokers.head.config.numPartitions), 
zkClient.getTopicPartitionCount(autoCreatedTopic))
+      for (i <- 0 until brokers.head.config.numPartitions) {
+        TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
+      }
+    }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 3d3d0ca..eee4608 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -51,27 +51,35 @@ abstract class BaseRequestTest extends 
IntegrationTestHarness {
   }
 
   def anySocketServer: SocketServer = {
-    servers.find { server =>
-      val state = server.brokerState
+    brokers.find { broker =>
+      val state = broker.brokerState
       state != BrokerState.NOT_RUNNING && state != BrokerState.SHUTTING_DOWN
     }.map(_.socketServer).getOrElse(throw new IllegalStateException("No live 
broker is available"))
   }
 
   def controllerSocketServer: SocketServer = {
-    servers.find { server =>
-      server.kafkaController.isActive
-    }.map(_.socketServer).getOrElse(throw new IllegalStateException("No 
controller broker is available"))
+    if (isKRaftTest()) {
+     controllerServer.socketServer
+    } else {
+      servers.find { server =>
+        server.kafkaController.isActive
+      }.map(_.socketServer).getOrElse(throw new IllegalStateException("No 
controller broker is available"))
+    }
   }
 
   def notControllerSocketServer: SocketServer = {
-    servers.find { server =>
-      !server.kafkaController.isActive
-    }.map(_.socketServer).getOrElse(throw new IllegalStateException("No 
non-controller broker is available"))
+    if (isKRaftTest()) {
+      anySocketServer
+    } else {
+      servers.find { server =>
+        !server.kafkaController.isActive
+      }.map(_.socketServer).getOrElse(throw new IllegalStateException("No 
non-controller broker is available"))
+    }
   }
 
   def brokerSocketServer(brokerId: Int): SocketServer = {
-    servers.find { server =>
-      server.config.brokerId == brokerId
+    brokers.find { broker =>
+      broker.config.brokerId == brokerId
     }.map(_.socketServer).getOrElse(throw new IllegalStateException(s"Could 
not find broker with id $brokerId"))
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index d91d58e..d043264 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -28,7 +28,9 @@ import org.apache.kafka.common.requests.{MetadataRequest, 
MetadataResponse}
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.test.TestUtils.isValidClusterId
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
@@ -40,20 +42,27 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     doSetup(testInfo, createOffsetsTopic = false)
   }
 
-  @Test
-  def testClusterIdWithRequestVersion1(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testClusterIdWithRequestVersion1(quorum: String): Unit = {
     val v1MetadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
     val v1ClusterId = v1MetadataResponse.clusterId
     assertNull(v1ClusterId, s"v1 clusterId should be null")
   }
 
-  @Test
-  def testClusterIdIsValid(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testClusterIdIsValid(quorum: String): Unit = {
     val metadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(2.toShort))
     isValidClusterId(metadataResponse.clusterId)
   }
 
-  @Test
+  /**
+   * This test only runs in ZK mode because in KRaft mode, the controller ID 
visible to
+   * the client is randomized.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
   def testControllerId(): Unit = {
     val controllerServer = servers.find(_.kafkaController.isActive).get
     val controllerId = controllerServer.config.brokerId
@@ -75,8 +84,9 @@ class MetadataRequestTest extends AbstractMetadataRequestTest 
{
     }, "Controller id should match the active controller after failover", 5000)
   }
 
-  @Test
-  def testRack(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testRack(quorum: String): Unit = {
     val metadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
     // Validate rack matches what's set in generateConfigs() above
     metadataResponse.brokers.forEach { broker =>
@@ -84,8 +94,9 @@ class MetadataRequestTest extends AbstractMetadataRequestTest 
{
     }
   }
 
-  @Test
-  def testIsInternal(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIsInternal(quorum: String): Unit = {
     val internalTopic = Topic.GROUP_METADATA_TOPIC_NAME
     val notInternalTopic = "notInternal"
     // create the topics
@@ -105,8 +116,9 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     assertEquals(Set(internalTopic).asJava, 
metadataResponse.buildCluster().internalTopics)
   }
 
-  @Test
-  def testNoTopicsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testNoTopicsRequest(quorum: String): Unit = {
     // create some topics
     createTopic("t1", 3, 2)
     createTopic("t2", 3, 2)
@@ -118,8 +130,9 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     assertTrue(metadataResponse.topicMetadata.isEmpty, "Response should have 
no topics")
   }
 
-  @Test
-  def testAutoTopicCreation(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAutoTopicCreation(quorum: String): Unit = {
     val topic1 = "t1"
     val topic2 = "t2"
     val topic3 = "t3"
@@ -143,26 +156,34 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     val response3 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic4, topic5).asJava, false, 4.toShort).build)
     assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response3.errors.get(topic4))
     assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response3.errors.get(topic5))
-    assertEquals(None, zkClient.getTopicPartitionCount(topic5))
+    if (!isKRaftTest()) {
+      assertEquals(None, zkClient.getTopicPartitionCount(topic5))
+    }
   }
 
-  @Test
-  def testAutoCreateTopicWithInvalidReplicationFactor(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAutoCreateTopicWithInvalidReplicationFactor(quorum: String): Unit = {
     // Shutdown all but one broker so that the number of brokers is less than 
the default replication factor
-    servers.tail.foreach(_.shutdown())
-    servers.tail.foreach(_.awaitShutdown())
+    brokers.tail.foreach(_.shutdown())
+    brokers.tail.foreach(_.awaitShutdown())
 
     val topic1 = "testAutoCreateTopic"
     val response1 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic1).asJava, true).build)
     assertEquals(1, response1.topicMetadata.size)
     val topicMetadata = response1.topicMetadata.asScala.head
-    assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicMetadata.error)
+    if (isKRaftTest()) {
+      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicMetadata.error)
+    } else {
+      assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicMetadata.error)
+    }
     assertEquals(topic1, topicMetadata.topic)
     assertEquals(0, topicMetadata.partitionMetadata.size)
   }
 
-  @Test
-  def testAutoCreateOfCollidingTopics(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testAutoCreateOfCollidingTopics(quorum: String): Unit = {
     val topic1 = "testAutoCreate.Topic"
     val topic2 = "testAutoCreate_Topic"
     val response1 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build)
@@ -176,7 +197,7 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
 
     val topicCreated = responseMap.head._1
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicCreated, 0)
-    TestUtils.waitForPartitionMetadata(servers, topicCreated, 0)
+    TestUtils.waitForPartitionMetadata(brokers, topicCreated, 0)
 
     // retry the metadata for the first auto created topic
     val response2 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topicCreated).asJava, true).build)
@@ -191,8 +212,9 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     assertTrue(partitionMetadata.leaderId.get >= 0)
   }
 
-  @Test
-  def testAllTopicsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAllTopicsRequest(quorum: String): Unit = {
     // create some topics
     createTopic("t1", 3, 2)
     createTopic("t2", 3, 2)
@@ -208,8 +230,9 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     assertEquals(2, metadataResponseV1.topicMetadata.size(), "V1 Response 
should have 2 (all) topics")
   }
 
-  @Test
-  def testTopicIdsInResponse(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTopicIdsInResponse(quorum: String): Unit = {
     val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
     val topic1 = "topic1"
     val topic2 = "topic2"
@@ -217,7 +240,7 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     createTopic(topic2, replicaAssignment)
 
     // if version < 9, return ZERO_UUID in MetadataResponse
-    val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, 
topic2).asJava, true, 0, 9).build(), Some(controllerSocketServer))
+    val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, 
topic2).asJava, true, 0, 9).build(), Some(anySocketServer))
     assertEquals(2, resp1.topicMetadata.size)
     resp1.topicMetadata.forEach { topicMetadata =>
       assertEquals(Errors.NONE, topicMetadata.error)
@@ -225,7 +248,7 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     }
 
     // from version 10, UUID will be included in MetadataResponse
-    val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, 
topic2).asJava, true, 10, 10).build(), Some(notControllerSocketServer))
+    val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, 
topic2).asJava, true, 10, 10).build(), Some(anySocketServer))
     assertEquals(2, resp2.topicMetadata.size)
     resp2.topicMetadata.forEach { topicMetadata =>
       assertEquals(Errors.NONE, topicMetadata.error)
@@ -237,15 +260,15 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
   /**
     * Preferred replica should be the first item in the replicas list
     */
-  @Test
-  def testPreferredReplica(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testPreferredReplica(quorum: String): Unit = {
     val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
     createTopic("t1", replicaAssignment)
-    // Call controller and one different broker to ensure that metadata 
propagation works correctly
-    val responses = Seq(
-      sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava, 
true).build(), Some(controllerSocketServer)),
-      sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava, 
true).build(), Some(notControllerSocketServer))
-    )
+    // Test metadata on two different brokers to ensure that metadata 
propagation works correctly
+    val responses = Seq(0, 1).map(index =>
+      sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava, 
true).build(),
+        Some(brokers(index).socketServer)))
     responses.foreach { response =>
       assertEquals(1, response.topicMetadata.size)
       val topicMetadata = response.topicMetadata.iterator.next()
@@ -261,8 +284,9 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     }
   }
 
-  @Test
-  def testReplicaDownResponse(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testReplicaDownResponse(quorum: String): Unit = {
     val replicaDownTopic = "replicaDown"
     val replicaCount = 3
 
@@ -272,8 +296,8 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     // Kill a replica node that is not the leader
     val metadataResponse = sendMetadataRequest(new 
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
     val partitionMetadata = 
metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
-    val downNode = servers.find { server =>
-      val serverId = server.dataPlaneRequestProcessor.brokerId
+    val downNode = brokers.find { broker =>
+      val serverId = broker.dataPlaneRequestProcessor.brokerId
       val leaderId = partitionMetadata.leaderId
       val replicaIds = partitionMetadata.replicaIds.asScala
       leaderId.isPresent && leaderId.get() != serverId && 
replicaIds.contains(serverId)
@@ -283,7 +307,7 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     TestUtils.waitUntilTrue(() => {
       val response = sendMetadataRequest(new 
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
       !response.brokers.asScala.exists(_.id == 
downNode.dataPlaneRequestProcessor.brokerId)
-    }, "Replica was not found down", 5000)
+    }, "Replica was not found down", 50000)
 
     // Validate version 0 still filters unavailable replicas and contains error
     val v0MetadataResponse = sendMetadataRequest(new 
MetadataRequest(requestData(List(replicaDownTopic), true), 0.toShort))
@@ -306,10 +330,14 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     assertEquals(replicaCount, v1PartitionMetadata.replicaIds.size, s"Response 
should have $replicaCount replicas")
   }
 
-  @Test
-  def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
-    def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
-      val activeBrokers = servers.filter(_.brokerState != 
BrokerState.NOT_RUNNING)
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIsrAfterBrokerShutDownAndJoinsBack(quorum: String): Unit = {
+    def checkIsr[B <: KafkaBroker](
+      brokers: Seq[B],
+      topic: String
+    ): Unit = {
+      val activeBrokers = brokers.filter(_.brokerState != 
BrokerState.NOT_RUNNING)
       val expectedIsr = activeBrokers.map(_.config.brokerId).toSet
 
       // Assert that topic metadata at new brokers is updated correctly
@@ -333,45 +361,52 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     val replicaCount = 3
     createTopic(topic, 1, replicaCount)
 
-    servers.last.shutdown()
-    servers.last.awaitShutdown()
-    servers.last.startup()
+    brokers.last.shutdown()
+    brokers.last.awaitShutdown()
+    brokers.last.startup()
 
-    checkIsr(servers, topic)
+    checkIsr(brokers, topic)
   }
 
-  @Test
-  def testAliveBrokersWithNoTopics(): Unit = {
-    def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): 
Unit = {
-      var controllerMetadataResponse: Option[MetadataResponse] = None
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAliveBrokersWithNoTopics(quorum: String): Unit = {
+    def checkMetadata[B <: KafkaBroker](
+      brokers: Seq[B],
+      expectedBrokersCount: Int
+    ): Unit = {
+      var response: Option[MetadataResponse] = None
       TestUtils.waitUntilTrue(() => {
         val metadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
-          Some(controllerSocketServer))
-        controllerMetadataResponse = Some(metadataResponse)
+          Some(anySocketServer))
+        response = Some(metadataResponse)
         metadataResponse.brokers.size == expectedBrokersCount
-      }, s"Expected $expectedBrokersCount brokers, but there are 
${controllerMetadataResponse.get.brokers.size} " +
-        "according to the Controller")
+      }, s"Expected $expectedBrokersCount brokers, but there are 
${response.get.brokers.size}")
 
-      val brokersInController = 
controllerMetadataResponse.get.brokers.asScala.toSeq.sortBy(_.id)
+      val brokersSorted = response.get.brokers.asScala.toSeq.sortBy(_.id)
 
       // Assert that metadata is propagated correctly
-      servers.filter(_.brokerState != BrokerState.NOT_RUNNING).foreach { 
broker =>
+      brokers.filter(_.brokerState == BrokerState.RUNNING).foreach { broker =>
         TestUtils.waitUntilTrue(() => {
           val metadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
             Some(brokerSocketServer(broker.config.brokerId)))
           val brokers = metadataResponse.brokers.asScala.toSeq.sortBy(_.id)
           val topicMetadata = 
metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic)
-          brokersInController == brokers && 
metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic) == topicMetadata
+          brokersSorted == brokers && 
metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic) == topicMetadata
         }, s"Topic metadata not updated correctly")
       }
     }
 
-    val serverToShutdown = servers.filterNot(_.kafkaController.isActive).last
-    serverToShutdown.shutdown()
-    serverToShutdown.awaitShutdown()
-    checkMetadata(servers, servers.size - 1)
+    val brokerToShutdown = if (isKRaftTest()) {
+      brokers.last
+    } else {
+      servers.filterNot(_.kafkaController.isActive).last
+    }
+    brokerToShutdown.shutdown()
+    brokerToShutdown.awaitShutdown()
+    checkMetadata(brokers, brokers.size - 1)
 
-    serverToShutdown.startup()
-    checkMetadata(servers, servers.size)
+    brokerToShutdown.startup()
+    checkMetadata(brokers, brokers.size)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 99d7639..f91d80c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -24,6 +24,7 @@ import java.nio.charset.{Charset, StandardCharsets}
 import java.nio.file.{Files, StandardOpenOption}
 import java.security.cert.X509Certificate
 import java.time.Duration
+import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, 
Executors, TimeUnit}
 import java.util.{Arrays, Collections, Optional, Properties}
@@ -381,22 +382,41 @@ object TestUtils extends Logging {
 
   def createTopicWithAdmin[B <: KafkaBroker](
       topic: String,
+      brokers: Seq[B],
       numPartitions: Int = 1,
       replicationFactor: Int = 1,
-      brokers: Seq[B],
+      replicaAssignment: collection.Map[Int, Seq[Int]] = Map.empty,
       topicConfig: Properties = new Properties,
       adminConfig: Properties = new Properties): 
scala.collection.immutable.Map[Int, Int] = {
+    val effectiveNumPartitions = if (replicaAssignment.isEmpty) {
+      numPartitions
+    } else {
+      replicaAssignment.size
+    }
     val adminClient = createAdminClient(brokers, adminConfig)
     try {
-      val configsMap = new java.util.HashMap[String, String]()
+      val configsMap = new util.HashMap[String, String]()
       topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString))
       try {
-        adminClient.createTopics(Collections.singletonList(new NewTopic(
-          topic, numPartitions, 
replicationFactor.toShort).configs(configsMap))).all().get()
+        val result = if (replicaAssignment.isEmpty) {
+          adminClient.createTopics(Collections.singletonList(new NewTopic(
+            topic, numPartitions, 
replicationFactor.toShort).configs(configsMap)))
+        } else {
+          val assignment = new util.HashMap[Integer, util.List[Integer]]()
+          replicaAssignment.forKeyValue { case (k, v) =>
+            val replicas = new util.ArrayList[Integer]
+            v.foreach(r => replicas.add(r.asInstanceOf[Integer]))
+            assignment.put(k.asInstanceOf[Integer], replicas)
+          }
+          adminClient.createTopics(Collections.singletonList(new NewTopic(
+            topic, assignment).configs(configsMap)))
+        }
+        result.all().get()
       } catch {
         case e: ExecutionException => if (!(e.getCause != null &&
             e.getCause.isInstanceOf[TopicExistsException] &&
-            topicHasSameNumPartitionsAndReplicationFactor(adminClient, topic, 
numPartitions, replicationFactor))) {
+            topicHasSameNumPartitionsAndReplicationFactor(adminClient, topic,
+              effectiveNumPartitions, replicationFactor))) {
           throw e
         }
       }
@@ -404,9 +424,9 @@ object TestUtils extends Logging {
       adminClient.close()
     }
     // wait until we've propagated all partitions metadata to all brokers
-    val allPartitionsMetadata = waitForAllPartitionsMetadata(brokers, topic, 
numPartitions)
+    val allPartitionsMetadata = waitForAllPartitionsMetadata(brokers, topic, 
effectiveNumPartitions)
 
-    (0 until numPartitions).map { i =>
+    (0 until effectiveNumPartitions).map { i =>
       i -> allPartitionsMetadata.get(new TopicPartition(topic, 
i)).map(_.leader()).getOrElse(
         throw new IllegalStateException(s"Cannot get the partition leader for 
topic: $topic, partition: $i in server metadata cache"))
     }.toMap
@@ -1063,7 +1083,8 @@ object TestUtils extends Logging {
    */
   def waitForAllPartitionsMetadata[B <: KafkaBroker](
       brokers: Seq[B],
-      topic: String, expectedNumPartitions: Int): Map[TopicPartition, 
UpdateMetadataPartitionState] = {
+      topic: String,
+      expectedNumPartitions: Int): Map[TopicPartition, 
UpdateMetadataPartitionState] = {
     waitUntilTrue(
       () => brokers.forall { broker =>
         if (expectedNumPartitions == 0) {

Reply via email to