This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0781de247fb KAFKA-12617 Convert MetadataRequestTest to use ClusterTest
(#21101)
0781de247fb is described below
commit 0781de247fb72a4433b7bd2dbd2802d39fe6c7d7
Author: Chang-Yu Huang <[email protected]>
AuthorDate: Fri Jan 2 07:10:09 2026 -0500
KAFKA-12617 Convert MetadataRequestTest to use ClusterTest (#21101)
# Description
Changed `MetadataRequestTest` to use `ClusterTest`
# Changes
Added `ClusterInstance#createTopicWithAssignment` to align with
`KafkaServerTestHarness#createTopicWithAssignment`
Helper functions inherited from base classes were rewritten in
`AbstractMetadataRequestTest`
# Issue
`testIsrAfterBrokerShutDownAndJoinsBack` fails because of thread leak:
```
org.opentest4j.AssertionFailedError: Thread leak detected: JMX server
connection timeout 28 ==>
Expected :true
Actual :false
```
`testAliveBrokersWithNoTopics` would also fail if the last
`checkMetadata` is removed.
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/suppressions.xml | 1 +
.../kafka/server/AbstractMetadataRequestTest.scala | 62 ---
.../unit/kafka/server/MetadataRequestTest.scala | 341 ----------------
.../apache/kafka/server/MetadataRequestTest.java | 431 +++++++++++++++++++++
.../apache/kafka/common/test/ClusterInstance.java | 8 +
5 files changed, 440 insertions(+), 403 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index e821674bfb6..e82b3e57fab 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -50,6 +50,7 @@
<suppress checks="CyclomaticComplexity"
files="ListConsumerGroupTest.java"/>
<suppress
checks="ClassFanOutComplexity|CyclomaticComplexity|MethodLength|ParameterNumber|JavaNCSS|ImportControl"
files="RequestConvertToJson.java"/>
<suppress checks="ImportControl"
files="BrokerRegistrationRequestTest.java"/>
+ <suppress checks="ImportControl" files="MetadataRequestTest.java"/>
<!-- Clients -->
<suppress id="dontUseSystemExit"
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
deleted file mode 100644
index b0f0f74e88d..00000000000
--- a/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.Properties
-import kafka.network.SocketServer
-import kafka.utils.TestUtils
-import org.apache.kafka.common.message.MetadataRequestData
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs}
-import org.junit.jupiter.api.Assertions.assertEquals
-
-abstract class AbstractMetadataRequestTest extends BaseRequestTest {
-
- override def brokerPropertyOverrides(properties: Properties): Unit = {
-
properties.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
"1")
-
properties.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG,
"2")
- properties.setProperty(ServerConfigs.BROKER_RACK_CONFIG,
s"rack/${properties.getProperty(ServerConfigs.BROKER_ID_CONFIG)}")
- }
-
- protected def requestData(topics: List[String], allowAutoTopicCreation:
Boolean): MetadataRequestData = {
- val data = new MetadataRequestData
- if (topics == null)
- data.setTopics(null)
- else
- topics.foreach(topic =>
- data.topics.add(
- new MetadataRequestData.MetadataRequestTopic()
- .setName(topic)))
-
- data.setAllowAutoTopicCreation(allowAutoTopicCreation)
- data
- }
-
- protected def sendMetadataRequest(request: MetadataRequest, destination:
Option[SocketServer] = None): MetadataResponse = {
- connectAndReceive[MetadataResponse](request, destination =
destination.getOrElse(anySocketServer))
- }
-
- protected def checkAutoCreatedTopic(autoCreatedTopic: String, response:
MetadataResponse): Unit = {
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response.errors.get(autoCreatedTopic))
- for (i <- 0 until brokers.head.config.numPartitions) {
- TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
deleted file mode 100644
index 47200721690..00000000000
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ /dev/null
@@ -1,341 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.Optional
-import kafka.utils.TestUtils
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.UnsupportedVersionException
-import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
-import org.apache.kafka.metadata.BrokerState
-import org.apache.kafka.test.TestUtils.isValidClusterId
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
-
-import scala.collection.Seq
-import scala.jdk.CollectionConverters._
-
-class MetadataRequestTest extends AbstractMetadataRequestTest {
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- doSetup(testInfo, createOffsetsTopic = false)
- }
-
- @Test
- def testClusterIdWithRequestVersion1(): Unit = {
- val v1MetadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
- val v1ClusterId = v1MetadataResponse.clusterId
- assertNull(v1ClusterId, s"v1 clusterId should be null")
- }
-
- @Test
- def testClusterIdIsValid(): Unit = {
- val metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(4.toShort))
- isValidClusterId(metadataResponse.clusterId)
- }
-
- @Test
- def testRack(): Unit = {
- val metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(4.toShort))
- // Validate rack matches what's set in generateConfigs() above
- metadataResponse.brokers.forEach { broker =>
- assertEquals(s"rack/${broker.id}", broker.rack, "Rack information should
match config")
- }
- }
-
- @Test
- def testIsInternal(): Unit = {
- val internalTopic = Topic.GROUP_METADATA_TOPIC_NAME
- val notInternalTopic = "notInternal"
- // create the topics
- createTopic(internalTopic, 3, 2)
- createTopic(notInternalTopic, 3, 2)
-
- val metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(4.toShort))
- assertTrue(metadataResponse.errors.isEmpty, "Response should have no
errors")
-
- val topicMetadata = metadataResponse.topicMetadata.asScala
- val internalTopicMetadata = topicMetadata.find(_.topic ==
internalTopic).get
- val notInternalTopicMetadata = topicMetadata.find(_.topic ==
notInternalTopic).get
-
- assertTrue(internalTopicMetadata.isInternal, "internalTopic should show
isInternal")
- assertFalse(notInternalTopicMetadata.isInternal, "notInternalTopic topic
not should show isInternal")
-
- assertEquals(Set(internalTopic).asJava,
metadataResponse.buildCluster().internalTopics)
- }
-
- @Test
- def testNoTopicsRequest(): Unit = {
- // create some topics
- createTopic("t1", 3, 2)
- createTopic("t2", 3, 2)
-
- val metadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List[String]().asJava, true, 4.toShort).build)
- assertTrue(metadataResponse.errors.isEmpty, "Response should have no
errors")
- assertTrue(metadataResponse.topicMetadata.isEmpty, "Response should have
no topics")
- }
-
- @Test
- def testAutoTopicCreation(): Unit = {
- val topic1 = "t1"
- val topic2 = "t2"
- val topic3 = "t3"
- val topic4 = "t4"
- val topic5 = "t5"
- createTopic(topic1)
-
- val response1 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build())
- assertNull(response1.errors.get(topic1))
- checkAutoCreatedTopic(topic2, response1)
-
- // The default behavior in old versions of the metadata API is to allow
topic creation, so
- // protocol downgrades should happen gracefully when auto-creation is
explicitly requested.
- val response2 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic3).asJava, true).build(1))
- checkAutoCreatedTopic(topic3, response2)
-
- // V3 doesn't support a configurable allowAutoTopicCreation, so disabling
auto-creation is not supported
- assertThrows(classOf[UnsupportedVersionException], () =>
sendMetadataRequest(new MetadataRequest(requestData(List(topic4),
allowAutoTopicCreation = false), 3.toShort)))
-
- // V4 and higher support a configurable allowAutoTopicCreation
- val response3 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic4, topic5).asJava, false, 4.toShort).build)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response3.errors.get(topic4))
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response3.errors.get(topic5))
- }
-
- @Test
- def testAutoCreateTopicWithInvalidReplicationFactor(): Unit = {
- // Shutdown all but one broker so that the number of brokers is less than
the default replication factor
- brokers.tail.foreach(_.shutdown())
- brokers.tail.foreach(_.awaitShutdown())
-
- val topic1 = "testAutoCreateTopic"
- val response1 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic1).asJava, true).build)
- assertEquals(1, response1.topicMetadata.size)
- val topicMetadata = response1.topicMetadata.asScala.head
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicMetadata.error)
- assertEquals(topic1, topicMetadata.topic)
- assertEquals(0, topicMetadata.partitionMetadata.size)
- }
-
- @Test
- def testAllTopicsRequest(): Unit = {
- // create some topics
- createTopic("t1", 3, 2)
- createTopic("t2", 3, 2)
-
- // v0, Empty list represents all topics
- val metadataResponseV0 = sendMetadataRequest(new
MetadataRequest(requestData(List(), allowAutoTopicCreation = true), 0.toShort))
- assertTrue(metadataResponseV0.errors.isEmpty, "V0 Response should have no
errors")
- assertEquals(2, metadataResponseV0.topicMetadata.size(), "V0 Response
should have 2 (all) topics")
-
- // v1, Null represents all topics
- val metadataResponseV1 =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
- assertTrue(metadataResponseV1.errors.isEmpty, "V1 Response should have no
errors")
- assertEquals(2, metadataResponseV1.topicMetadata.size(), "V1 Response
should have 2 (all) topics")
- }
-
- @Test
- def testTopicIdsInResponse(): Unit = {
- val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
- val topic1 = "topic1"
- val topic2 = "topic2"
- createTopicWithAssignment(topic1, replicaAssignment)
- createTopicWithAssignment(topic2, replicaAssignment)
-
- // if version < 9, return ZERO_UUID in MetadataResponse
- val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1,
topic2).asJava, true, 0, 9).build(), Some(anySocketServer))
- assertEquals(2, resp1.topicMetadata.size)
- resp1.topicMetadata.forEach { topicMetadata =>
- assertEquals(Errors.NONE, topicMetadata.error)
- assertEquals(Uuid.ZERO_UUID, topicMetadata.topicId())
- }
-
- // from version 10, UUID will be included in MetadataResponse
- val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1,
topic2).asJava, true, 10, 10).build(), Some(anySocketServer))
- assertEquals(2, resp2.topicMetadata.size)
- resp2.topicMetadata.forEach { topicMetadata =>
- assertEquals(Errors.NONE, topicMetadata.error)
- assertNotEquals(Uuid.ZERO_UUID, topicMetadata.topicId())
- assertNotNull(topicMetadata.topicId())
- }
- }
-
- /**
- * Preferred replica should be the first item in the replicas list
- */
- @Test
- def testPreferredReplica(): Unit = {
- val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
- createTopicWithAssignment("t1", replicaAssignment)
- // Test metadata on two different brokers to ensure that metadata
propagation works correctly
- val responses = Seq(0, 1).map(index =>
- sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava,
true).build(),
- Some(brokers(index).socketServer)))
- responses.foreach { response =>
- assertEquals(1, response.topicMetadata.size)
- val topicMetadata = response.topicMetadata.iterator.next()
- assertEquals(Errors.NONE, topicMetadata.error)
- assertEquals("t1", topicMetadata.topic)
- assertEquals(Set(0, 1),
topicMetadata.partitionMetadata.asScala.map(_.partition).toSet)
- topicMetadata.partitionMetadata.forEach { partitionMetadata =>
- val assignment = replicaAssignment(partitionMetadata.partition)
- assertEquals(assignment, partitionMetadata.replicaIds.asScala)
- assertEquals(assignment, partitionMetadata.inSyncReplicaIds.asScala)
- assertEquals(Optional.of(assignment.head), partitionMetadata.leaderId)
- }
- }
- }
-
- @Test
- def testPartitionInfoPreferredReplica(): Unit = {
- val replicaAssignment = Map(0 -> Seq(1, 2, 0))
- val topic = "testPartitionInfoPreferredReplicaTopic"
- createTopicWithAssignment(topic, replicaAssignment)
-
- val response = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic).asJava, true).build())
- val cluster = response.buildCluster()
- val partitionInfos = cluster.partitionsForTopic(topic).asScala
- assertEquals(1, partitionInfos.size)
-
- val partitionInfo = partitionInfos.head
- val preferredReplicaId = replicaAssignment(partitionInfo.partition()).head
- assertEquals(preferredReplicaId, partitionInfo.replicas().head.id())
- }
-
- @Test
- def testReplicaDownResponse(): Unit = {
- val replicaDownTopic = "replicaDown"
- val replicaCount = 3
-
- // create a topic with 3 replicas
- createTopic(replicaDownTopic, 1, replicaCount)
-
- // Kill a replica node that is not the leader
- val metadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
- val partitionMetadata =
metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
- val downNode = brokers.find { broker =>
- val serverId = broker.dataPlaneRequestProcessor.brokerId
- val leaderId = partitionMetadata.leaderId
- val replicaIds = partitionMetadata.replicaIds.asScala
- leaderId.isPresent && leaderId.get() != serverId &&
replicaIds.contains(serverId)
- }.get
- downNode.shutdown()
-
- TestUtils.waitUntilTrue(() => {
- val response = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build())
- !response.brokers.asScala.exists(_.id ==
downNode.dataPlaneRequestProcessor.brokerId)
- }, "Replica was not found down", 50000)
-
- // Validate version 0 still filters unavailable replicas and contains error
- val v0MetadataResponse = sendMetadataRequest(new
MetadataRequest(requestData(List(replicaDownTopic), allowAutoTopicCreation =
true), 0.toShort))
- val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
- assertTrue(v0MetadataResponse.errors.isEmpty, "Response should have no
errors")
- assertFalse(v0BrokerIds.contains(downNode.config.brokerId), s"The downed
broker should not be in the brokers list")
- assertTrue(v0MetadataResponse.topicMetadata.size == 1, "Response should
have one topic")
- val v0PartitionMetadata =
v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
- assertTrue(v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE,
"PartitionMetadata should have an error")
- assertTrue(v0PartitionMetadata.replicaIds.size == replicaCount - 1,
s"Response should have ${replicaCount - 1} replicas")
-
- // Validate version 1 returns unavailable replicas with no error
- val v1MetadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(1))
- val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
- assertTrue(v1MetadataResponse.errors.isEmpty, "Response should have no
errors")
- assertFalse(v1BrokerIds.contains(downNode.config.brokerId), s"The downed
broker should not be in the brokers list")
- assertEquals(1, v1MetadataResponse.topicMetadata.size, "Response should
have one topic")
- val v1PartitionMetadata =
v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
- assertEquals(Errors.NONE, v1PartitionMetadata.error, "PartitionMetadata
should have no errors")
- assertEquals(replicaCount, v1PartitionMetadata.replicaIds.size, s"Response
should have $replicaCount replicas")
- }
-
- @Test
- def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
- def checkIsr[B <: KafkaBroker](
- brokers: Seq[B],
- topic: String
- ): Unit = {
- val activeBrokers = brokers.filter(_.brokerState !=
BrokerState.NOT_RUNNING)
- val expectedIsr = activeBrokers.map(_.config.brokerId).toSet
-
- // Assert that topic metadata at new brokers is updated correctly
- activeBrokers.foreach { broker =>
- var actualIsr = Set.empty[Int]
- TestUtils.waitUntilTrue(() => {
- val metadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic).asJava, false).build,
- Some(brokerSocketServer(broker.config.brokerId)))
- val firstPartitionMetadata =
metadataResponse.topicMetadata.asScala.headOption.flatMap(_.partitionMetadata.asScala.headOption)
- actualIsr = firstPartitionMetadata.map { partitionMetadata =>
- partitionMetadata.inSyncReplicaIds.asScala.map(Int.unbox).toSet
- }.getOrElse(Set.empty)
- expectedIsr == actualIsr
- }, s"Topic metadata not updated correctly in broker $broker\n" +
- s"Expected ISR: $expectedIsr \n" +
- s"Actual ISR : $actualIsr")
- }
- }
-
- val topic = "isr-after-broker-shutdown"
- val replicaCount = 3
- createTopic(topic, 1, replicaCount)
-
- brokers.last.shutdown()
- brokers.last.awaitShutdown()
- brokers.last.startup()
-
- checkIsr(brokers, topic)
- }
-
- @Test
- def testAliveBrokersWithNoTopics(): Unit = {
- def checkMetadata[B <: KafkaBroker](
- brokers: Seq[B],
- expectedBrokersCount: Int
- ): Unit = {
- var response: Option[MetadataResponse] = None
- TestUtils.waitUntilTrue(() => {
- val metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
- Some(anySocketServer))
- response = Some(metadataResponse)
- metadataResponse.brokers.size == expectedBrokersCount
- }, s"Expected $expectedBrokersCount brokers, but there are
${response.get.brokers.size}")
-
- val brokersSorted = response.get.brokers.asScala.toSeq.sortBy(_.id)
-
- // Assert that metadata is propagated correctly
- brokers.filter(_.brokerState == BrokerState.RUNNING).foreach { broker =>
- TestUtils.waitUntilTrue(() => {
- val metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
- Some(brokerSocketServer(broker.config.brokerId)))
- val brokers = metadataResponse.brokers.asScala.toSeq.sortBy(_.id)
- val topicMetadata =
metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic)
- brokersSorted == brokers &&
metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic) == topicMetadata
- }, s"Topic metadata not updated correctly")
- }
- }
-
- val brokerToShutdown = brokers.last
- brokerToShutdown.shutdown()
- brokerToShutdown.awaitShutdown()
- checkMetadata(brokers, brokers.size - 1)
-
- brokerToShutdown.startup()
- checkMetadata(brokers, brokers.size)
- }
-}
diff --git
a/server/src/test/java/org/apache/kafka/server/MetadataRequestTest.java
b/server/src/test/java/org/apache/kafka/server/MetadataRequestTest.java
new file mode 100644
index 00000000000..95b157001da
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/MetadataRequestTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import kafka.network.SocketServer;
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "5"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack/0"),
+ @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack/1"),
+ @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack/2")
+ }
+)
+
+public class MetadataRequestTest {
+
+ private final ClusterInstance clusterInstance;
+
+ MetadataRequestTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ }
+
+ private List<KafkaBroker> brokers() {
+ return clusterInstance.brokers().values().stream().toList();
+ }
+
+ private SocketServer anySocketServer() throws IllegalStateException {
+ Map<Integer, KafkaBroker> aliveBrokers =
clusterInstance.aliveBrokers();
+ if (aliveBrokers.isEmpty()) {
+ throw new IllegalStateException("No alive brokers is available");
+ }
+ return
aliveBrokers.values().stream().map(KafkaBroker::socketServer).iterator().next();
+ }
+
+ private MetadataRequestData requestData(List<String> topics, boolean
allowAutoTopicCreation) {
+ MetadataRequestData data = new MetadataRequestData();
+ if (topics == null) {
+ data.setTopics(null);
+ } else {
+ List<MetadataRequestData.MetadataRequestTopic> requestTopics =
topics.stream().map(topic -> new
MetadataRequestData.MetadataRequestTopic().setName(topic)).toList();
+ data.setTopics(requestTopics);
+ data.setAllowAutoTopicCreation(allowAutoTopicCreation);
+ }
+ return data;
+ }
+
+ private MetadataResponse sendMetadataRequest(MetadataRequest request)
throws IOException {
+ return sendMetadataRequest(request, anySocketServer());
+ }
+
+ private MetadataResponse sendMetadataRequest(MetadataRequest request,
SocketServer destination) throws IOException {
+ ListenerName listener = clusterInstance.clientListener();
+ int port = destination.boundPort(listener);
+ return IntegrationTestUtils.connectAndReceive(request, port);
+ }
+
+ protected void checkAutoCreatedTopic(String autoCreatedTopic,
MetadataResponse response) throws InterruptedException {
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response.errors().get(autoCreatedTopic));
+ int numPartitions = brokers().get(0).config().numPartitions();
+
+ for (int i = 0; i < numPartitions; i++) {
+ final int partitionId = i;
+ TestUtils.waitForCondition(() -> {
+ for (KafkaBroker broker : brokers()) {
+ Optional<LeaderAndIsr> leaderOpt =
broker.metadataCache().getLeaderAndIsr(autoCreatedTopic, partitionId);
+ if (leaderOpt.isEmpty()) {
+ return false;
+ }
+ if
(!FetchRequest.isValidBrokerId(leaderOpt.get().leader())) {
+ return false;
+ }
+ }
+ return true;
+ }, "Partition [" + autoCreatedTopic + "," + partitionId + "]
metadata not propagated");
+ }
+ }
+
+ @ClusterTest
+ public void testClusterIdWithRequestVersion1() throws IOException {
+ MetadataResponse v1MetadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics().build((short) 1));
+ String v1ClusterId = v1MetadataResponse.clusterId();
+ assertNull(v1ClusterId, "v1 clusterId should be null");
+ }
+
+ @ClusterTest
+ public void testClusterIdIsValid() throws IOException {
+ MetadataResponse metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics().build((short) 4));
+ TestUtils.isValidClusterId(metadataResponse.clusterId());
+ }
+
+ @ClusterTest
+ public void testRack() throws IOException {
+ MetadataResponse metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics().build((short) 4));
+ // Validate rack matches what's set in generateConfigs() above
+ metadataResponse.brokers().forEach(broker -> {
+ assertEquals(String.format("rack/%d", broker.id()), broker.rack(),
"Rack information should match config");
+ });
+ }
+
+ @ClusterTest
+ public void testIsInternal() throws InterruptedException, IOException {
+ String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME;
+ String notInternalTopic = "notInternal";
+ // create the topics
+ clusterInstance.createTopic(internalTopic, 3, (short) 2);
+ clusterInstance.createTopic(notInternalTopic, 3, (short) 2);
+
+ MetadataResponse metadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics().build((short) 4));
+ assertTrue(metadataResponse.errors().isEmpty(), "Response should have
no errors");
+
+ Collection<MetadataResponse.TopicMetadata> topicMetadata =
metadataResponse.topicMetadata();
+ MetadataResponse.TopicMetadata internalTopicMetadata =
topicMetadata.stream().filter(metadata ->
metadata.topic().equals(internalTopic)).findFirst().get();
+ MetadataResponse.TopicMetadata notInternalTopicMetadata =
topicMetadata.stream().filter(metadata ->
metadata.topic().equals(notInternalTopic)).findFirst().get();
+
+ assertTrue(internalTopicMetadata.isInternal(), "internalTopic should
show isInternal");
+ assertFalse(notInternalTopicMetadata.isInternal(), "notInternalTopic
topic not should show isInternal");
+
+ assertEquals(Set.of(internalTopic),
metadataResponse.buildCluster().internalTopics());
+ }
+
+ @ClusterTest
+ public void testNoTopicsRequest() throws InterruptedException, IOException
{
+ // create some topics
+ clusterInstance.createTopic("t1", 3, (short) 2);
+ clusterInstance.createTopic("t2", 3, (short) 2);
+
+ MetadataResponse metadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(Collections.emptyList(), true, (short) 4).build());
+ assertTrue(metadataResponse.errors().isEmpty(), "Response should have
no errors");
+ assertTrue(metadataResponse.topicMetadata().isEmpty(), "Response
should have no topics");
+ }
+
+ @ClusterTest
+ public void testAutoTopicCreation() throws InterruptedException,
IOException {
+ String topic1 = "t1";
+ String topic2 = "t2";
+ String topic3 = "t3";
+ String topic4 = "t4";
+ String topic5 = "t5";
+ clusterInstance.createTopic(topic1, 1, (short) 1);
+
+ MetadataResponse response1 = sendMetadataRequest(new
MetadataRequest.Builder(List.of(topic1, topic2), true).build());
+ assertNull(response1.errors().get(topic1));
+ checkAutoCreatedTopic(topic2, response1);
+
+ // The default behavior in old versions of the metadata API is to
allow topic creation, so
+ // protocol downgrades should happen gracefully when auto-creation is
explicitly requested.
+ MetadataResponse response2 = sendMetadataRequest(new
MetadataRequest.Builder(List.of(topic3), true).build((short) 1));
+ checkAutoCreatedTopic(topic3, response2);
+
+ // V3 doesn't support a configurable allowAutoTopicCreation, so
disabling auto-creation is not supported
+ assertThrows(UnsupportedVersionException.class, () ->
sendMetadataRequest(new MetadataRequest.Builder(List.of(topic4), false, (short)
3).build()));
+
+ // V4 and higher support a configurable allowAutoTopicCreation
+ MetadataResponse response3 = sendMetadataRequest(new
MetadataRequest.Builder(List.of(topic4, topic5), false, (short) 4).build());
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response3.errors().get(topic4));
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response3.errors().get(topic5));
+ }
+
+ @ClusterTest(brokers = 3, serverProperties = {@ClusterConfigProperty(key =
ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "3")})
+ public void testAutoCreateTopicWithInvalidReplicationFactor() throws
IOException {
+ // Shutdown all but one broker so that the number of brokers is less
than the default replication factor
+ for (int i = 1; i < brokers().size(); i++) {
+ KafkaBroker broker = brokers().get(i);
+ broker.shutdown();
+ broker.awaitShutdown();
+ }
+
+ String topic1 = "testAutoCreateTopic";
+ MetadataResponse response1 = sendMetadataRequest(new
MetadataRequest.Builder(List.of(topic1), true).build());
+ assertEquals(1, response1.topicMetadata().size());
+ MetadataResponse.TopicMetadata topicMetadata =
response1.topicMetadata().stream().toList().get(0);
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicMetadata.error());
+ assertEquals(topic1, topicMetadata.topic());
+ assertEquals(0, topicMetadata.partitionMetadata().size());
+ }
+
+ @ClusterTest
+ public void testAllTopicsRequest() throws InterruptedException,
IOException {
+ // create some topics
+ clusterInstance.createTopic("t1", 3, (short) 2);
+ clusterInstance.createTopic("t2", 3, (short) 2);
+
+ // v0, Empty list represents all topics
+ MetadataResponse metadataResponseV0 = sendMetadataRequest(new
MetadataRequest(requestData(Collections.emptyList(), true), (short) 0));
+ assertTrue(metadataResponseV0.errors().isEmpty(), "V0 Response should
have no errors");
+ assertEquals(2, metadataResponseV0.topicMetadata().size(), "V0
Response should have 2 (all) topics");
+
+ // v1, Null represents all topics
+ MetadataResponse metadataResponseV1 =
sendMetadataRequest(MetadataRequest.Builder.allTopics().build((short) 1));
+ assertTrue(metadataResponseV1.errors().isEmpty(), "V1 Response should
have no errors");
+ assertEquals(2, metadataResponseV1.topicMetadata().size(), "V1
Response should have 2 (all) topics");
+ }
+
+ @ClusterTest
+ public void testTopicIdsInResponse() throws ExecutionException,
InterruptedException, IOException {
+ Map<Integer, List<Integer>> replicaAssignment = Map.of(0, List.of(1,
2, 0), 1, List.of(2, 0, 1));
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ clusterInstance.createTopicWithAssignment(topic1, replicaAssignment);
+ clusterInstance.createTopicWithAssignment(topic2, replicaAssignment);
+
+ // if version < 9, return ZERO_UUID in MetadataResponse
+ MetadataResponse resp1 = sendMetadataRequest(new
MetadataRequest.Builder(List.of(topic1, topic2), true, (short) 0, (short)
9).build());
+ assertEquals(2, resp1.topicMetadata().size());
+ resp1.topicMetadata().forEach(topicMetadata -> {
+ assertEquals(Errors.NONE, topicMetadata.error());
+ assertEquals(Uuid.ZERO_UUID, topicMetadata.topicId());
+ });
+
+ // from version 10, UUID will be included in MetadataResponse
+ MetadataResponse resp2 = sendMetadataRequest(new
MetadataRequest.Builder(List.of(topic1, topic2), true, (short) 10, (short)
10).build());
+ assertEquals(2, resp2.topicMetadata().size());
+ resp2.topicMetadata().forEach(topicMetadata -> {
+ assertEquals(Errors.NONE, topicMetadata.error());
+ assertNotEquals(Uuid.ZERO_UUID, topicMetadata.topicId());
+ assertNotNull(topicMetadata.topicId());
+ });
+ }
+
+ /**
+ * Preferred replica should be the first item in the replicas list
+ */
+ @ClusterTest
+ public void testPreferredReplica() throws ExecutionException,
InterruptedException, IOException {
+ Map<Integer, List<Integer>> replicaAssignment = Map.of(0, List.of(1,
2, 0), 1, List.of(2, 0, 1));
+ clusterInstance.createTopicWithAssignment("t1", replicaAssignment);
+ // Test metadata on two different brokers to ensure that metadata
propagation works correctly
+ List<MetadataResponse> responses = List.of(
+ sendMetadataRequest(new MetadataRequest.Builder(List.of("t1"),
true).build(), brokers().get(0).socketServer()),
+ sendMetadataRequest(new MetadataRequest.Builder(List.of("t1"),
true).build(), brokers().get(1).socketServer())
+ );
+ responses.forEach(response -> {
+ assertEquals(1, response.topicMetadata().size());
+ MetadataResponse.TopicMetadata topicMetadata =
response.topicMetadata().iterator().next();
+ assertEquals(Errors.NONE, topicMetadata.error());
+ assertEquals("t1", topicMetadata.topic());
+ assertEquals(Set.of(0, 1),
topicMetadata.partitionMetadata().stream().map(MetadataResponse.PartitionMetadata::partition).collect(Collectors.toSet()));
+ topicMetadata.partitionMetadata().forEach(partitionMetadata -> {
+ List<Integer> assignment =
replicaAssignment.get(partitionMetadata.partition());
+ assertEquals(assignment, partitionMetadata.replicaIds);
+ assertEquals(assignment, partitionMetadata.inSyncReplicaIds);
+ assertEquals(Optional.of(assignment.get(0)),
partitionMetadata.leaderId);
+ });
+ });
+ }
+
+ @ClusterTest
+ public void testPartitionInfoPreferredReplica() throws ExecutionException,
InterruptedException, IOException {
+ Map<Integer, List<Integer>> replicaAssignment = Map.of(0, List.of(1,
2, 0));
+ String topic = "testPartitionInfoPreferredReplicaTopic";
+ clusterInstance.createTopicWithAssignment(topic, replicaAssignment);
+
+ MetadataResponse response = sendMetadataRequest(new
MetadataRequest.Builder(List.of(topic), true).build());
+ Cluster snapshot = response.buildCluster();
+ List<PartitionInfo> partitionInfos =
snapshot.partitionsForTopic(topic);
+ assertEquals(1, partitionInfos.size());
+
+ PartitionInfo partitionInfo = partitionInfos.get(0);
+ Integer preferredReplicaId =
replicaAssignment.get(partitionInfo.partition()).get(0);
+ assertEquals(preferredReplicaId, partitionInfo.replicas()[0].id());
+ }
+
+ @ClusterTest
+ public void testReplicaDownResponse() throws InterruptedException,
IOException {
+ String replicaDownTopic = "replicaDown";
+ short replicaCount = 3;
+
+ // create a topic with 3 replicas
+ clusterInstance.createTopic(replicaDownTopic, 1, replicaCount);
+
+ // Kill a replica node that is not the leader
+ MetadataResponse metadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List.of(replicaDownTopic), true).build());
+ MetadataResponse.PartitionMetadata partitionMetadata =
metadataResponse.topicMetadata().iterator().next().partitionMetadata().get(0);
+ KafkaBroker downNode = brokers().stream().filter(broker -> {
+ int serverId = broker.dataPlaneRequestProcessor().brokerId();
+ Optional<Integer> leaderId = partitionMetadata.leaderId;
+ List<Integer> replicaIds = partitionMetadata.replicaIds;
+ return leaderId.isPresent() && leaderId.get() != serverId &&
replicaIds.contains(serverId);
+ }).findFirst().orElse(null);
+ assertNotNull(downNode);
+ downNode.shutdown();
+
+ TestUtils.waitForCondition(() -> {
+ MetadataResponse response = sendMetadataRequest(new
MetadataRequest.Builder(List.of(replicaDownTopic), true).build());
+ return response.brokers().stream().noneMatch(broker -> broker.id()
== downNode.dataPlaneRequestProcessor().brokerId());
+ }, 50000, "Replica was not found down");
+
+ // Validate version 0 still filters unavailable replicas and contains
error
+ MetadataResponse v0MetadataResponse = sendMetadataRequest(new
MetadataRequest(requestData(List.of(replicaDownTopic), true), (short) 0));
+ List<Integer> v0BrokerIds =
v0MetadataResponse.brokers().stream().map(Node::id).toList();
+ assertTrue(v0MetadataResponse.errors().isEmpty(), "Response should
have no errors");
+ assertFalse(v0BrokerIds.contains(downNode.config().brokerId()), "The
downed broker should not be in the brokers list");
+ assertEquals(1, v0MetadataResponse.topicMetadata().size(), "Response
should have one topic");
+ MetadataResponse.PartitionMetadata v0PartitionMetadata =
v0MetadataResponse.topicMetadata().iterator().next().partitionMetadata().get(0);
+ assertEquals(Errors.REPLICA_NOT_AVAILABLE, v0PartitionMetadata.error,
"PartitionMetadata should have an error");
+ assertEquals(replicaCount - 1, v0PartitionMetadata.replicaIds.size(),
"Response should have %d replicas".formatted(replicaCount - 1));
+
+ // Validate version 1 returns unavailable replicas with no error
+ MetadataResponse v1MetadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List.of(replicaDownTopic), true).build((short) 1));
+ List<Integer> v1BrokerIds =
v1MetadataResponse.brokers().stream().map(Node::id).toList();
+ assertTrue(v1MetadataResponse.errors().isEmpty(), "Response should
have no errors");
+ assertFalse(v1BrokerIds.contains(downNode.config().brokerId()), "The
downed broker should not be in the brokers list");
+ assertEquals(1, v1MetadataResponse.topicMetadata().size(), "Response
should have one topic");
+ MetadataResponse.PartitionMetadata v1PartitionMetadata =
v1MetadataResponse.topicMetadata().iterator().next().partitionMetadata().get(0);
+ assertEquals(Errors.NONE, v1PartitionMetadata.error,
"PartitionMetadata should have no errors");
+ assertEquals(replicaCount, v1PartitionMetadata.replicaIds.size(),
"Response should have %d replicas".formatted(replicaCount));
+ }
+
+ private void checkIsr(List<KafkaBroker> brokers, String topic) {
+ List<KafkaBroker> activeBrokers = brokers.stream().filter(broker ->
broker.brokerState() != BrokerState.NOT_RUNNING).toList();
+ Set<Integer> expectedIsr = activeBrokers.stream().map(broker ->
broker.config().brokerId()).collect(Collectors.toSet());
+
+ // Assert that topic metadata at new brokers is updated correctly
+ activeBrokers.forEach(broker -> {
+ try {
+ TestUtils.waitForCondition(() -> {
+ MetadataResponse response = sendMetadataRequest(new
MetadataRequest.Builder(List.of(topic), false).build(), broker.socketServer());
+ MetadataResponse.PartitionMetadata firstPartitionMetadata
= response.topicMetadata().iterator().next().partitionMetadata().get(0);
+ Set<Integer> actualIsr = new
HashSet<Integer>(firstPartitionMetadata.inSyncReplicaIds);
+ return actualIsr.equals(expectedIsr);
+ }, "Topic metadata not updated correctly in broker
%s\n".formatted(broker));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @ClusterTest
+ public void testIsrAfterBrokerShutDownAndJoinsBack() throws
InterruptedException {
+ String topic = "isr-after-broker-shutdown";
+ short replicaCount = 3;
+ clusterInstance.createTopic(topic, 1, replicaCount);
+
+ brokers().get(0).shutdown();
+ brokers().get(0).awaitShutdown();
+ brokers().get(0).startup();
+ checkIsr(brokers(), topic);
+ }
+
+ private void checkMetadata(List<KafkaBroker> brokers, int
expectedBrokersCount) throws InterruptedException, IOException {
+ TestUtils.waitForCondition(() -> {
+ MetadataResponse response =
sendMetadataRequest(MetadataRequest.Builder.allTopics().build());
+ return response.brokers().size() == expectedBrokersCount;
+ }, "Expected " + expectedBrokersCount + " brokers");
+
+ MetadataResponse balancedResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics().build());
+ Set<Integer> expectedBrokerIds =
balancedResponse.brokers().stream().map(Node::id).collect(Collectors.toSet());
+
+ // Assert that metadata is propagated correctly
+ brokers.stream().filter(broker -> broker.brokerState() ==
BrokerState.RUNNING).forEach(broker -> {
+ try {
+ TestUtils.waitForCondition(() -> {
+ MetadataResponse response =
sendMetadataRequest(MetadataRequest.Builder.allTopics().build());
+ Set<Integer> actualBrokerIds =
response.brokers().stream().map(Node::id).collect(Collectors.toSet());
+ return actualBrokerIds.equals(expectedBrokerIds);
+ }, "Topic metadata not updated correctly");
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @ClusterTest
+ public void testAliveBrokersWithNoTopics() throws IOException,
InterruptedException {
+ brokers().get(0).shutdown();
+ brokers().get(0).awaitShutdown();
+ checkMetadata(brokers(), brokers().size() - 1);
+
+ brokers().get(0).startup();
+ checkMetadata(brokers(), brokers().size());
+ }
+}
\ No newline at end of file
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index f252ed1a1a7..8ff455c611e 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -327,6 +327,14 @@ public interface ClusterInstance {
}
}
+ default void createTopicWithAssignment(String topicName, Map<Integer,
List<Integer>> replicaAssignment) throws InterruptedException {
+ try (Admin admin = admin()) {
+ admin.createTopics(List.of(new NewTopic(topicName,
replicaAssignment)));
+ int partitions = replicaAssignment.size();
+ waitTopicCreation(topicName, partitions);
+ }
+ }
+
/**
* Deletes a topic and waits for the deletion to complete.
*