This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7530ac6 MINOR: enable KRaft in MetadataRequestTest (#11637)
7530ac6 is described below
commit 7530ac65834d98ae4f30fb32c50795b17f01b356
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Jan 5 21:53:54 2022 -0800
MINOR: enable KRaft in MetadataRequestTest (#11637)
Reviewers: David Arthur <[email protected]>
---
.../kafka/api/IntegrationTestHarness.scala | 14 ++
.../kafka/server/QuorumTestHarness.scala | 5 +-
.../kafka/integration/KafkaServerTestHarness.scala | 15 +-
.../kafka/server/AbstractMetadataRequestTest.scala | 16 +-
.../scala/unit/kafka/server/BaseRequestTest.scala | 28 ++--
.../unit/kafka/server/MetadataRequestTest.scala | 167 +++++++++++++--------
.../test/scala/unit/kafka/utils/TestUtils.scala | 37 ++++-
7 files changed, 190 insertions(+), 92 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 0f987e9..bc4166c 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -62,6 +62,7 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties,
logDirCount = logDirCount)
configureListeners(cfgs)
modifyConfigs(cfgs)
+ insertControllerListenersIfNeeded(cfgs)
cfgs.map(KafkaConfig.fromProps)
}
@@ -80,6 +81,19 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
}
}
+ private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit
= {
+ if (isKRaftTest()) {
+ props.foreach { config =>
+ // Add a security protocol for the CONTROLLER endpoint, if one is not
already set.
+ val securityPairs =
config.getProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "").split(",")
+ if (!securityPairs.exists(_.startsWith("CONTROLLER:"))) {
+ config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
+ (securityPairs ++
Seq(s"CONTROLLER:${controllerListenerSecurityProtocol.toString}")).mkString(","))
+ }
+ }
+ }
+ }
+
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
doSetup(testInfo, createOffsetsTopic = true)
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 5e679a7..ab2cb34 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -103,9 +103,10 @@ abstract class QuorumTestHarness extends Logging {
protected def zkAclsEnabled: Option[Boolean] = None
/**
- * When in KRaft mode, this test harness only support PLAINTEXT
+ * When in KRaft mode, the security protocol to use for the controller
listener.
+ * Can be overridden by subclasses.
*/
- private val controllerListenerSecurityProtocol: SecurityProtocol =
SecurityProtocol.PLAINTEXT
+ protected val controllerListenerSecurityProtocol: SecurityProtocol =
SecurityProtocol.PLAINTEXT
protected def kraftControllerConfigs(): Seq[Properties] = {
Seq(new Properties())
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 9bd9f57..acb1201 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -153,7 +153,12 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
topicConfig: Properties = new Properties,
adminClientConfig: Properties = new Properties):
scala.collection.immutable.Map[Int, Int] = {
if (isKRaftTest()) {
- TestUtils.createTopicWithAdmin(topic, numPartitions, replicationFactor,
brokers, topicConfig, adminClientConfig)
+ TestUtils.createTopicWithAdmin(topic = topic,
+ brokers = brokers,
+ numPartitions = numPartitions,
+ replicationFactor = replicationFactor,
+ topicConfig = topicConfig,
+ adminConfig = adminClientConfig)
} else {
TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor,
servers, topicConfig)
}
@@ -165,7 +170,13 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
* Return the leader for each partition.
*/
def createTopic(topic: String, partitionReplicaAssignment:
collection.Map[Int, Seq[Int]]): scala.collection.immutable.Map[Int, Int] =
- TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment, servers)
+ if (isKRaftTest()) {
+ TestUtils.createTopicWithAdmin(topic = topic,
+ replicaAssignment = partitionReplicaAssignment,
+ brokers = brokers)
+ } else {
+ TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment,
servers)
+ }
def deleteTopic(topic: String): Unit = {
if (isKRaftTest()) {
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
index b140851..309bfda 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
@@ -53,9 +53,17 @@ abstract class AbstractMetadataRequestTest extends
BaseRequestTest {
}
protected def checkAutoCreatedTopic(autoCreatedTopic: String, response:
MetadataResponse): Unit = {
- assertEquals(Errors.LEADER_NOT_AVAILABLE,
response.errors.get(autoCreatedTopic))
- assertEquals(Some(servers.head.config.numPartitions),
zkClient.getTopicPartitionCount(autoCreatedTopic))
- for (i <- 0 until servers.head.config.numPartitions)
- TestUtils.waitForPartitionMetadata(servers, autoCreatedTopic, i)
+ if (isKRaftTest()) {
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response.errors.get(autoCreatedTopic))
+ for (i <- 0 until brokers.head.config.numPartitions) {
+ TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
+ }
+ } else {
+ assertEquals(Errors.LEADER_NOT_AVAILABLE,
response.errors.get(autoCreatedTopic))
+ assertEquals(Some(brokers.head.config.numPartitions),
zkClient.getTopicPartitionCount(autoCreatedTopic))
+ for (i <- 0 until brokers.head.config.numPartitions) {
+ TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
+ }
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 3d3d0ca..eee4608 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -51,27 +51,35 @@ abstract class BaseRequestTest extends
IntegrationTestHarness {
}
def anySocketServer: SocketServer = {
- servers.find { server =>
- val state = server.brokerState
+ brokers.find { broker =>
+ val state = broker.brokerState
state != BrokerState.NOT_RUNNING && state != BrokerState.SHUTTING_DOWN
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No live
broker is available"))
}
def controllerSocketServer: SocketServer = {
- servers.find { server =>
- server.kafkaController.isActive
- }.map(_.socketServer).getOrElse(throw new IllegalStateException("No
controller broker is available"))
+ if (isKRaftTest()) {
+ controllerServer.socketServer
+ } else {
+ servers.find { server =>
+ server.kafkaController.isActive
+ }.map(_.socketServer).getOrElse(throw new IllegalStateException("No
controller broker is available"))
+ }
}
def notControllerSocketServer: SocketServer = {
- servers.find { server =>
- !server.kafkaController.isActive
- }.map(_.socketServer).getOrElse(throw new IllegalStateException("No
non-controller broker is available"))
+ if (isKRaftTest()) {
+ anySocketServer
+ } else {
+ servers.find { server =>
+ !server.kafkaController.isActive
+ }.map(_.socketServer).getOrElse(throw new IllegalStateException("No
non-controller broker is available"))
+ }
}
def brokerSocketServer(brokerId: Int): SocketServer = {
- servers.find { server =>
- server.config.brokerId == brokerId
+ brokers.find { broker =>
+ broker.config.brokerId == brokerId
}.map(_.socketServer).getOrElse(throw new IllegalStateException(s"Could
not find broker with id $brokerId"))
}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index d91d58e..d043264 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -28,7 +28,9 @@ import org.apache.kafka.common.requests.{MetadataRequest,
MetadataResponse}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.test.TestUtils.isValidClusterId
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -40,20 +42,27 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
doSetup(testInfo, createOffsetsTopic = false)
}
- @Test
- def testClusterIdWithRequestVersion1(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testClusterIdWithRequestVersion1(quorum: String): Unit = {
val v1MetadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
val v1ClusterId = v1MetadataResponse.clusterId
assertNull(v1ClusterId, s"v1 clusterId should be null")
}
- @Test
- def testClusterIdIsValid(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testClusterIdIsValid(quorum: String): Unit = {
val metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(2.toShort))
isValidClusterId(metadataResponse.clusterId)
}
- @Test
+ /**
+ * This test only runs in ZK mode because in KRaft mode, the controller ID
visible to
+ * the client is randomized.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk"))
def testControllerId(): Unit = {
val controllerServer = servers.find(_.kafkaController.isActive).get
val controllerId = controllerServer.config.brokerId
@@ -75,8 +84,9 @@ class MetadataRequestTest extends AbstractMetadataRequestTest
{
}, "Controller id should match the active controller after failover", 5000)
}
- @Test
- def testRack(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testRack(quorum: String): Unit = {
val metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
// Validate rack matches what's set in generateConfigs() above
metadataResponse.brokers.forEach { broker =>
@@ -84,8 +94,9 @@ class MetadataRequestTest extends AbstractMetadataRequestTest
{
}
}
- @Test
- def testIsInternal(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testIsInternal(quorum: String): Unit = {
val internalTopic = Topic.GROUP_METADATA_TOPIC_NAME
val notInternalTopic = "notInternal"
// create the topics
@@ -105,8 +116,9 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
assertEquals(Set(internalTopic).asJava,
metadataResponse.buildCluster().internalTopics)
}
- @Test
- def testNoTopicsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testNoTopicsRequest(quorum: String): Unit = {
// create some topics
createTopic("t1", 3, 2)
createTopic("t2", 3, 2)
@@ -118,8 +130,9 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
assertTrue(metadataResponse.topicMetadata.isEmpty, "Response should have
no topics")
}
- @Test
- def testAutoTopicCreation(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAutoTopicCreation(quorum: String): Unit = {
val topic1 = "t1"
val topic2 = "t2"
val topic3 = "t3"
@@ -143,26 +156,34 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
val response3 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic4, topic5).asJava, false, 4.toShort).build)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response3.errors.get(topic4))
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response3.errors.get(topic5))
- assertEquals(None, zkClient.getTopicPartitionCount(topic5))
+ if (!isKRaftTest()) {
+ assertEquals(None, zkClient.getTopicPartitionCount(topic5))
+ }
}
- @Test
- def testAutoCreateTopicWithInvalidReplicationFactor(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAutoCreateTopicWithInvalidReplicationFactor(quorum: String): Unit = {
// Shutdown all but one broker so that the number of brokers is less than
the default replication factor
- servers.tail.foreach(_.shutdown())
- servers.tail.foreach(_.awaitShutdown())
+ brokers.tail.foreach(_.shutdown())
+ brokers.tail.foreach(_.awaitShutdown())
val topic1 = "testAutoCreateTopic"
val response1 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic1).asJava, true).build)
assertEquals(1, response1.topicMetadata.size)
val topicMetadata = response1.topicMetadata.asScala.head
- assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicMetadata.error)
+ if (isKRaftTest()) {
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicMetadata.error)
+ } else {
+ assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicMetadata.error)
+ }
assertEquals(topic1, topicMetadata.topic)
assertEquals(0, topicMetadata.partitionMetadata.size)
}
- @Test
- def testAutoCreateOfCollidingTopics(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk"))
+ def testAutoCreateOfCollidingTopics(quorum: String): Unit = {
val topic1 = "testAutoCreate.Topic"
val topic2 = "testAutoCreate_Topic"
val response1 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build)
@@ -176,7 +197,7 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
val topicCreated = responseMap.head._1
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicCreated, 0)
- TestUtils.waitForPartitionMetadata(servers, topicCreated, 0)
+ TestUtils.waitForPartitionMetadata(brokers, topicCreated, 0)
// retry the metadata for the first auto created topic
val response2 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topicCreated).asJava, true).build)
@@ -191,8 +212,9 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
assertTrue(partitionMetadata.leaderId.get >= 0)
}
- @Test
- def testAllTopicsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAllTopicsRequest(quorum: String): Unit = {
// create some topics
createTopic("t1", 3, 2)
createTopic("t2", 3, 2)
@@ -208,8 +230,9 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
assertEquals(2, metadataResponseV1.topicMetadata.size(), "V1 Response
should have 2 (all) topics")
}
- @Test
- def testTopicIdsInResponse(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTopicIdsInResponse(quorum: String): Unit = {
val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
val topic1 = "topic1"
val topic2 = "topic2"
@@ -217,7 +240,7 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
createTopic(topic2, replicaAssignment)
// if version < 9, return ZERO_UUID in MetadataResponse
- val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1,
topic2).asJava, true, 0, 9).build(), Some(controllerSocketServer))
+ val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1,
topic2).asJava, true, 0, 9).build(), Some(anySocketServer))
assertEquals(2, resp1.topicMetadata.size)
resp1.topicMetadata.forEach { topicMetadata =>
assertEquals(Errors.NONE, topicMetadata.error)
@@ -225,7 +248,7 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
}
// from version 10, UUID will be included in MetadataResponse
- val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1,
topic2).asJava, true, 10, 10).build(), Some(notControllerSocketServer))
+ val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1,
topic2).asJava, true, 10, 10).build(), Some(anySocketServer))
assertEquals(2, resp2.topicMetadata.size)
resp2.topicMetadata.forEach { topicMetadata =>
assertEquals(Errors.NONE, topicMetadata.error)
@@ -237,15 +260,15 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
/**
* Preferred replica should be the first item in the replicas list
*/
- @Test
- def testPreferredReplica(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testPreferredReplica(quorum: String): Unit = {
val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
createTopic("t1", replicaAssignment)
- // Call controller and one different broker to ensure that metadata
propagation works correctly
- val responses = Seq(
- sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava,
true).build(), Some(controllerSocketServer)),
- sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava,
true).build(), Some(notControllerSocketServer))
- )
+ // Test metadata on two different brokers to ensure that metadata
propagation works correctly
+ val responses = Seq(0, 1).map(index =>
+ sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava,
true).build(),
+ Some(brokers(index).socketServer)))
responses.foreach { response =>
assertEquals(1, response.topicMetadata.size)
val topicMetadata = response.topicMetadata.iterator.next()
@@ -261,8 +284,9 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
}
}
- @Test
- def testReplicaDownResponse(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testReplicaDownResponse(quorum: String): Unit = {
val replicaDownTopic = "replicaDown"
val replicaCount = 3
@@ -272,8 +296,8 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
// Kill a replica node that is not the leader
val metadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
val partitionMetadata =
metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
- val downNode = servers.find { server =>
- val serverId = server.dataPlaneRequestProcessor.brokerId
+ val downNode = brokers.find { broker =>
+ val serverId = broker.dataPlaneRequestProcessor.brokerId
val leaderId = partitionMetadata.leaderId
val replicaIds = partitionMetadata.replicaIds.asScala
leaderId.isPresent && leaderId.get() != serverId &&
replicaIds.contains(serverId)
@@ -283,7 +307,7 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
TestUtils.waitUntilTrue(() => {
val response = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
!response.brokers.asScala.exists(_.id ==
downNode.dataPlaneRequestProcessor.brokerId)
- }, "Replica was not found down", 5000)
+ }, "Replica was not found down", 50000)
// Validate version 0 still filters unavailable replicas and contains error
val v0MetadataResponse = sendMetadataRequest(new
MetadataRequest(requestData(List(replicaDownTopic), true), 0.toShort))
@@ -306,10 +330,14 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
assertEquals(replicaCount, v1PartitionMetadata.replicaIds.size, s"Response
should have $replicaCount replicas")
}
- @Test
- def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
- def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
- val activeBrokers = servers.filter(_.brokerState !=
BrokerState.NOT_RUNNING)
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testIsrAfterBrokerShutDownAndJoinsBack(quorum: String): Unit = {
+ def checkIsr[B <: KafkaBroker](
+ brokers: Seq[B],
+ topic: String
+ ): Unit = {
+ val activeBrokers = brokers.filter(_.brokerState !=
BrokerState.NOT_RUNNING)
val expectedIsr = activeBrokers.map(_.config.brokerId).toSet
// Assert that topic metadata at new brokers is updated correctly
@@ -333,45 +361,52 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
val replicaCount = 3
createTopic(topic, 1, replicaCount)
- servers.last.shutdown()
- servers.last.awaitShutdown()
- servers.last.startup()
+ brokers.last.shutdown()
+ brokers.last.awaitShutdown()
+ brokers.last.startup()
- checkIsr(servers, topic)
+ checkIsr(brokers, topic)
}
- @Test
- def testAliveBrokersWithNoTopics(): Unit = {
- def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int):
Unit = {
- var controllerMetadataResponse: Option[MetadataResponse] = None
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAliveBrokersWithNoTopics(quorum: String): Unit = {
+ def checkMetadata[B <: KafkaBroker](
+ brokers: Seq[B],
+ expectedBrokersCount: Int
+ ): Unit = {
+ var response: Option[MetadataResponse] = None
TestUtils.waitUntilTrue(() => {
val metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
- Some(controllerSocketServer))
- controllerMetadataResponse = Some(metadataResponse)
+ Some(anySocketServer))
+ response = Some(metadataResponse)
metadataResponse.brokers.size == expectedBrokersCount
- }, s"Expected $expectedBrokersCount brokers, but there are
${controllerMetadataResponse.get.brokers.size} " +
- "according to the Controller")
+ }, s"Expected $expectedBrokersCount brokers, but there are
${response.get.brokers.size}")
- val brokersInController =
controllerMetadataResponse.get.brokers.asScala.toSeq.sortBy(_.id)
+ val brokersSorted = response.get.brokers.asScala.toSeq.sortBy(_.id)
// Assert that metadata is propagated correctly
- servers.filter(_.brokerState != BrokerState.NOT_RUNNING).foreach {
broker =>
+ brokers.filter(_.brokerState == BrokerState.RUNNING).foreach { broker =>
TestUtils.waitUntilTrue(() => {
val metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
Some(brokerSocketServer(broker.config.brokerId)))
val brokers = metadataResponse.brokers.asScala.toSeq.sortBy(_.id)
val topicMetadata =
metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic)
- brokersInController == brokers &&
metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic) == topicMetadata
+ brokersSorted == brokers &&
metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic) == topicMetadata
}, s"Topic metadata not updated correctly")
}
}
- val serverToShutdown = servers.filterNot(_.kafkaController.isActive).last
- serverToShutdown.shutdown()
- serverToShutdown.awaitShutdown()
- checkMetadata(servers, servers.size - 1)
+ val brokerToShutdown = if (isKRaftTest()) {
+ brokers.last
+ } else {
+ servers.filterNot(_.kafkaController.isActive).last
+ }
+ brokerToShutdown.shutdown()
+ brokerToShutdown.awaitShutdown()
+ checkMetadata(brokers, brokers.size - 1)
- serverToShutdown.startup()
- checkMetadata(servers, servers.size)
+ brokerToShutdown.startup()
+ checkMetadata(brokers, brokers.size)
}
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 99d7639..f91d80c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -24,6 +24,7 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
+import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{Callable, CompletableFuture, ExecutionException,
Executors, TimeUnit}
import java.util.{Arrays, Collections, Optional, Properties}
@@ -381,22 +382,41 @@ object TestUtils extends Logging {
def createTopicWithAdmin[B <: KafkaBroker](
topic: String,
+ brokers: Seq[B],
numPartitions: Int = 1,
replicationFactor: Int = 1,
- brokers: Seq[B],
+ replicaAssignment: collection.Map[Int, Seq[Int]] = Map.empty,
topicConfig: Properties = new Properties,
adminConfig: Properties = new Properties):
scala.collection.immutable.Map[Int, Int] = {
+ val effectiveNumPartitions = if (replicaAssignment.isEmpty) {
+ numPartitions
+ } else {
+ replicaAssignment.size
+ }
val adminClient = createAdminClient(brokers, adminConfig)
try {
- val configsMap = new java.util.HashMap[String, String]()
+ val configsMap = new util.HashMap[String, String]()
topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString))
try {
- adminClient.createTopics(Collections.singletonList(new NewTopic(
- topic, numPartitions,
replicationFactor.toShort).configs(configsMap))).all().get()
+ val result = if (replicaAssignment.isEmpty) {
+ adminClient.createTopics(Collections.singletonList(new NewTopic(
+ topic, numPartitions,
replicationFactor.toShort).configs(configsMap)))
+ } else {
+ val assignment = new util.HashMap[Integer, util.List[Integer]]()
+ replicaAssignment.forKeyValue { case (k, v) =>
+ val replicas = new util.ArrayList[Integer]
+ v.foreach(r => replicas.add(r.asInstanceOf[Integer]))
+ assignment.put(k.asInstanceOf[Integer], replicas)
+ }
+ adminClient.createTopics(Collections.singletonList(new NewTopic(
+ topic, assignment).configs(configsMap)))
+ }
+ result.all().get()
} catch {
case e: ExecutionException => if (!(e.getCause != null &&
e.getCause.isInstanceOf[TopicExistsException] &&
- topicHasSameNumPartitionsAndReplicationFactor(adminClient, topic,
numPartitions, replicationFactor))) {
+ topicHasSameNumPartitionsAndReplicationFactor(adminClient, topic,
+ effectiveNumPartitions, replicationFactor))) {
throw e
}
}
@@ -404,9 +424,9 @@ object TestUtils extends Logging {
adminClient.close()
}
// wait until we've propagated all partitions metadata to all brokers
- val allPartitionsMetadata = waitForAllPartitionsMetadata(brokers, topic,
numPartitions)
+ val allPartitionsMetadata = waitForAllPartitionsMetadata(brokers, topic,
effectiveNumPartitions)
- (0 until numPartitions).map { i =>
+ (0 until effectiveNumPartitions).map { i =>
i -> allPartitionsMetadata.get(new TopicPartition(topic,
i)).map(_.leader()).getOrElse(
throw new IllegalStateException(s"Cannot get the partition leader for
topic: $topic, partition: $i in server metadata cache"))
}.toMap
@@ -1063,7 +1083,8 @@ object TestUtils extends Logging {
*/
def waitForAllPartitionsMetadata[B <: KafkaBroker](
brokers: Seq[B],
- topic: String, expectedNumPartitions: Int): Map[TopicPartition,
UpdateMetadataPartitionState] = {
+ topic: String,
+ expectedNumPartitions: Int): Map[TopicPartition,
UpdateMetadataPartitionState] = {
waitUntilTrue(
() => brokers.forall { broker =>
if (expectedNumPartitions == 0) {