This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new a1d1834942c KAFKA-15780: Wait for consistent KRaft metadata when
creating or deleting topics (#14695) (#14713)
a1d1834942c is described below
commit a1d1834942c0bc4252b2931c73cab6598445f740
Author: Justine Olshan <[email protected]>
AuthorDate: Mon Nov 13 09:26:23 2023 -0800
KAFKA-15780: Wait for consistent KRaft metadata when creating or deleting
topics (#14695) (#14713)
TestUtils.createTopicWithAdmin calls waitForAllPartitionsMetadata which
waits for partition(s) to be present in each brokers' metadata cache. This is a
sufficient check in ZK mode because the controller sends an LISR request before
sending an UpdateMetadataRequest which means that the partition in the
ReplicaManager will be updated before the metadata cache.
In KRaft mode, the metadata cache is updated first, so the check may return
before partitions and other metadata listeners are fully initialized.
Testing:
Insert a Thread.sleep(100) in BrokerMetadataPublisher.onMetadataUpdate after
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
and run EdgeCaseRequestTest.testProduceRequestWithNullClientId and the test
will fail locally nearly deterministically. After the change(s), the test no
longer fails.
Conflicts:
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
Reviewers: Justine Olshan <[email protected]>, Divij Vaidya
<[email protected]>, David Mao <[email protected]>
---
.../kafka/admin/RemoteTopicCrudTest.scala | 36 +++++++++----------
.../kafka/admin/TopicCommandIntegrationTest.scala | 42 +++++++++++-----------
.../kafka/api/BaseProducerSendTest.scala | 20 +++++------
.../kafka/api/EndToEndAuthorizationTest.scala | 2 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 4 +--
.../kafka/api/PlaintextProducerSendTest.scala | 8 ++---
.../kafka/api/ProducerCompressionTest.scala | 2 +-
.../server/DynamicBrokerReconfigurationTest.scala | 8 ++---
.../server/FetchFromFollowerIntegrationTest.scala | 3 ++
.../kafka/integration/KafkaServerTestHarness.scala | 9 +++--
.../server/ConsumerGroupHeartbeatRequestTest.scala | 3 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 8 ++++-
.../tiered/storage/TieredStorageTestContext.java | 2 +-
13 files changed, 80 insertions(+), 67 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index 6d8fbe1bbe7..7b21ef82686 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -79,7 +79,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
- TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@@ -91,7 +91,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256")
- TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@@ -103,7 +103,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001")
- TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@@ -115,7 +115,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1025")
- TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@@ -128,7 +128,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
assertThrowsException(classOf[InvalidConfigurationException], () =>
- TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}
@@ -140,7 +140,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
assertThrowsException(classOf[InvalidConfigurationException], () =>
- TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}
@@ -151,7 +151,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact")
assertThrowsException(classOf[InvalidConfigurationException], () =>
- TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}
@@ -160,7 +160,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
- TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
@@ -181,11 +181,11 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfigWithRemoteStorage = new Properties()
topicConfigWithRemoteStorage.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true")
val message = assertThrowsException(classOf[InvalidConfigurationException],
- () => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions,
+ () => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions,
numReplicationFactor, topicConfig = topicConfigWithRemoteStorage))
assertTrue(message.getMessage.contains("Tiered Storage functionality is
disabled in the broker"))
- TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
numPartitions, numReplicationFactor)
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor)
val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
Collections.singleton(
@@ -203,7 +203,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
- TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
@@ -224,7 +224,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
- TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
@@ -245,7 +245,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
- TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// inherited local retention ms is 1000
@@ -265,7 +265,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
- TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
numPartitions, numReplicationFactor,
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// inherited local retention bytes is 1024
@@ -288,9 +288,9 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
- TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, brokerCount,
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)
- TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers)
+ TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers)
assertThrowsException(classOf[UnknownTopicOrPartitionException],
() => TestUtils.describeTopic(createAdminClient(), testTopicName),
"Topic should be deleted")
TestUtils.waitUntilTrue(() =>
@@ -304,7 +304,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true")
- TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, brokerCount,
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)
val tsDisabledProps = TestUtils.createBrokerConfigs(1,
zkConnectOrNull).head
@@ -326,7 +326,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
false.toString)
- TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, brokerCount,
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)
val tsDisabledProps = TestUtils.createBrokerConfigs(1,
zkConnectOrNull).head
diff --git
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
index 9e1beaf6adf..cf0bcda861d 100644
---
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
@@ -264,9 +264,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val topic1 = "kafka.testTopic1"
val topic2 = "kafka.testTopic2"
val topic3 = "oooof.testTopic1"
- TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2)
- TestUtils.createTopicWithAdmin(adminClient, topic2, brokers, 2, 2)
- TestUtils.createTopicWithAdmin(adminClient, topic3, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, topic1, brokers,
controllerServers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, topic2, brokers,
controllerServers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, topic3, brokers,
controllerServers, 2, 2)
val output = TestUtils.grabConsoleOutput(
topicService.listTopics(new TopicCommandOptions(Array("--topic",
"kafka.*"))))
@@ -280,8 +280,8 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ValueSource(strings = Array("zk", "kraft"))
def testListTopicsWithExcludeInternal(quorum: String): Unit = {
val topic1 = "kafka.testTopic1"
- TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2)
- TestUtils.createTopicWithAdmin(adminClient,
Topic.GROUP_METADATA_TOPIC_NAME, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, topic1, brokers,
controllerServers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient,
Topic.GROUP_METADATA_TOPIC_NAME, brokers, controllerServers, 2, 2)
val output = TestUtils.grabConsoleOutput(
topicService.listTopics(new
TopicCommandOptions(Array("--exclude-internal"))))
@@ -293,7 +293,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterPartitionCount(quorum: String): Unit = {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, 2, 2)
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "3")))
@@ -308,7 +308,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignment(quorum: String): Unit = {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, 2, 2)
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2",
"--partitions", "3")))
@@ -324,7 +324,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignmentWithMoreAssignmentThanPartitions(quorum: String):
Unit = {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, 2, 2)
assertThrows(classOf[ExecutionException],
() => topicService.alterTopic(new TopicCommandOptions(
@@ -334,7 +334,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignmentWithMorePartitionsThanAssignment(quorum: String):
Unit = {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, 2, 2)
assertThrows(classOf[ExecutionException],
() => topicService.alterTopic(new TopicCommandOptions(
@@ -520,7 +520,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribe(quorum: String): Unit = {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, 2, 2)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--topic",
testTopicName))))
@@ -545,7 +545,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnavailablePartitions(quorum: String): Unit = {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
numBrokers, 1)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, numBrokers, 1)
try {
// check which partition is on broker 0 which we'll kill
@@ -587,7 +587,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnderReplicatedPartitions(quorum: String): Unit = {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1,
numBrokers)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, 1, numBrokers)
try {
killBroker(0)
@@ -612,7 +612,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
numBrokers.toString)
// create topic
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1,
numBrokers, topicConfig = topicProps)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, 1, numBrokers, topicConfig = topicProps)
try {
killBroker(0)
@@ -638,7 +638,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
def
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(quorum:
String): Unit = {
val tp = new TopicPartition(testTopicName, 0)
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, 1, 1)
// Produce multiple batches.
TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages =
10, acks = -1)
@@ -690,7 +690,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
// create topic
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1,
numBrokers, topicConfig = topicProps)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, 1, numBrokers, topicConfig = topicProps)
try {
killBroker(0)
@@ -735,10 +735,10 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
numBrokers.toString)
// create topic
- TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, brokers, 1,
numBrokers, topicConfig = topicProps)
- TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, brokers,
1, numBrokers)
- TestUtils.createTopicWithAdmin(adminClient, offlineTopic, brokers, 1,
replicaAssignment = Map(0 -> Seq(0)))
- TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, brokers,
1, replicaAssignment = Map(0 -> Seq(1, 2, 3)))
+ TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, brokers,
controllerServers, 1, numBrokers, topicConfig = topicProps)
+ TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, brokers,
controllerServers, 1, numBrokers)
+ TestUtils.createTopicWithAdmin(adminClient, offlineTopic, brokers,
controllerServers, 1, replicaAssignment = Map(0 -> Seq(0)))
+ TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, brokers,
controllerServers, 1, replicaAssignment = Map(0 -> Seq(1, 2, 3)))
try {
killBroker(0)
@@ -811,7 +811,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
Set(new TopicPartition(testTopicName, 0)).asJava
)
- TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1)
+ TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers,
controllerServers, 1, 1)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--topic",
testTopicName))))
@@ -823,7 +823,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithTopicNameCollision(quorum: String): Unit = {
- TestUtils.createTopicWithAdmin(adminClient, "foo_bar", brokers, 1,
numBrokers)
+ TestUtils.createTopicWithAdmin(adminClient, "foo_bar", brokers,
controllerServers, 1, numBrokers)
assertThrows(classOf[InvalidTopicException],
() => topicService.createTopic(new TopicCommandOptions(Array("--topic",
"foo.bar"))))
diff --git
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 5f51d2bd41b..93f7a7212c4 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -154,7 +154,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
try {
// create topic
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
1, 2)
// send a normal record
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, "key".getBytes(StandardCharsets.UTF_8),
@@ -208,7 +208,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
timeoutMs: Long = 20000L): Unit = {
val partition = 0
try {
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
1, 2)
val futures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition,
s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -263,7 +263,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
"LogAppendTime")
else
topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
"CreateTime")
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig
= topicProps)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
1, 2, topicConfig = topicProps)
val recordAndFutures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition, baseTimestamp + i,
s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -296,7 +296,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
try {
// create topic
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
1, 2)
// non-blocking send a list of records
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null,
"key".getBytes(StandardCharsets.UTF_8),
@@ -329,7 +329,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
val producer = createProducer()
try {
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
2, 2)
val partition = 1
val now = System.currentTimeMillis()
@@ -373,7 +373,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
val replicas = List(0, follower)
try {
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 3, Map(0 ->
replicas))
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
1, 3, Map(0 -> replicas))
val partition = 0
val now = System.currentTimeMillis()
@@ -422,7 +422,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
val producer = createProducer(maxBlockMs = 5 * 1000L)
// create topic
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
1, 2)
val partition0 = 0
var futures0 = (1 to numRecords).map { i =>
@@ -479,7 +479,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
def testFlush(quorum: String): Unit = {
val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs =
Int.MaxValue)
try {
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
2, 2)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
"value".getBytes(StandardCharsets.UTF_8))
for (_ <- 0 until 50) {
@@ -499,7 +499,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCloseWithZeroTimeoutFromCallerThread(quorum: String): Unit = {
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
2, 2)
val partition = 0
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, null,
@@ -525,7 +525,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCloseWithZeroTimeoutFromSenderThread(quorum: String): Unit = {
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
1, 2)
val partition = 0
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, null, "value".getBytes(StandardCharsets.UTF_8))
diff --git
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 7731efd360f..46e674c00aa 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -167,7 +167,7 @@ abstract class EndToEndAuthorizationTest extends
IntegrationTestHarness with Sas
// create the test topic with all the brokers as replicas
val superuserAdminClient = createSuperuserAdminClient()
- TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic =
topic, brokers = brokers,
+ TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic =
topic, brokers = brokers, controllers = controllerServers,
numPartitions = 1, replicationFactor = 3, topicConfig = new Properties)
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 784374d23e8..805fdda0e8a 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2692,11 +2692,11 @@ object PlaintextAdminIntegrationTest {
// Create topics
val topic1 = "invalid-alter-configs-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
- createTopicWithAdmin(admin, topic1, test.brokers, numPartitions = 1,
replicationFactor = 1)
+ createTopicWithAdmin(admin, topic1, test.brokers, test.controllerServers,
numPartitions = 1, replicationFactor = 1)
val topic2 = "invalid-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
- createTopicWithAdmin(admin, topic2, test.brokers, numPartitions = 1,
replicationFactor = 1)
+ createTopicWithAdmin(admin, topic2, test.brokers, test.controllerServers,
numPartitions = 1, replicationFactor = 1)
val topicConfigEntries1 = Seq(
new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "1.1"), //
this value is invalid as it's above 1.0
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index ac7b775c228..77132d919bc 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -66,7 +66,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
val producer = createProducer(batchSize = 0)
val numRecords = 10;
try {
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 2)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
2)
val futures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, null,
s"value$i".getBytes(StandardCharsets.UTF_8))
producer.send(record)
@@ -128,7 +128,7 @@ class PlaintextProducerSendTest extends
BaseProducerSendTest {
// set the TopicConfig for timestamp validation to have 1 minute
threshold. Note that recordTimestamp has 5 minutes diff
val oneMinuteInMs: Long = 1 * 60 * 60 * 1000L
topicProps.setProperty(messageTimeStampConfig, oneMinuteInMs.toString)
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig =
topicProps)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
1, 2, topicConfig = topicProps)
val producer = createProducer()
try {
@@ -157,7 +157,7 @@ class PlaintextProducerSendTest extends
BaseProducerSendTest {
// set the TopicConfig for timestamp validation to be the same as the
record timestamp
topicProps.setProperty(messageTimeStampConfig, recordTimestamp.toString)
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig =
topicProps)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
1, 2, topicConfig = topicProps)
val producer = createProducer()
@@ -178,7 +178,7 @@ class PlaintextProducerSendTest extends
BaseProducerSendTest {
// set the TopicConfig for timestamp validation to have 10 minute
threshold. Note that recordTimestamp has 5 minutes diff
val tenMinutesInMs: Long = 10 * 60 * 60 * 1000L
topicProps.setProperty(messageTimeStampConfig, tenMinutesInMs.toString)
- TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig =
topicProps)
+ TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
1, 2, topicConfig = topicProps)
val producer = createProducer()
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 6135ec952ca..08f10e89083 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -87,7 +87,7 @@ class ProducerCompressionTest extends QuorumTestHarness {
val admin = TestUtils.createAdminClient(Seq(broker),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
try {
- TestUtils.createTopicWithAdmin(admin, topic, Seq(broker))
+ TestUtils.createTopicWithAdmin(admin, topic, Seq(broker),
controllerServers)
} finally {
admin.close()
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 3bbfbbed78a..435b87d330d 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -156,8 +156,8 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
createAdminClient(SecurityProtocol.SSL, SecureInternal)
- TestUtils.createTopicWithAdmin(adminClients.head, topic, servers,
numPartitions, replicationFactor = numServers)
- TestUtils.createTopicWithAdmin(adminClients.head,
Topic.GROUP_METADATA_TOPIC_NAME, servers,
+ TestUtils.createTopicWithAdmin(adminClients.head, topic, servers,
controllerServers, numPartitions, replicationFactor = numServers)
+ TestUtils.createTopicWithAdmin(adminClients.head,
Topic.GROUP_METADATA_TOPIC_NAME, servers, controllerServers,
numPartitions = servers.head.config.offsetsTopicPartitions,
replicationFactor = numServers,
topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs)
@@ -356,7 +356,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
@ValueSource(strings = Array("zk", "kraft"))
def testKeyStoreAlter(quorum: String): Unit = {
val topic2 = "testtopic2"
- TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers,
numPartitions, replicationFactor = numServers)
+ TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers,
controllerServers, numPartitions, replicationFactor = numServers)
// Start a producer and consumer that work with the current broker
keystore.
// This should continue working while changes are made
@@ -578,7 +578,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
val topic2 = "testtopic2"
val topicProps = new Properties
topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2")
- TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers,
numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps)
+ TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers,
controllerServers, numPartitions = 1, replicationFactor = numServers,
topicConfig = topicProps)
def getLogOrThrow(tp: TopicPartition): UnifiedLog = {
var (logOpt, found) = TestUtils.computeUntilTrue {
diff --git
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
index 3b6f28b4f8a..7231c0599a6 100644
---
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
@@ -65,6 +65,7 @@ class FetchFromFollowerIntegrationTest extends
BaseFetchRequestTest {
admin,
topic,
brokers,
+ controllerServers,
replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
)
@@ -105,6 +106,7 @@ class FetchFromFollowerIntegrationTest extends
BaseFetchRequestTest {
admin,
topic,
brokers,
+ controllerServers,
replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
)
@@ -131,6 +133,7 @@ class FetchFromFollowerIntegrationTest extends
BaseFetchRequestTest {
admin,
topic,
brokers,
+ controllerServers,
replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
)
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 8775a8323d4..01e6a91eb85 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -152,7 +152,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
): Unit = {
if (isKRaftTest()) {
resource(createAdminClient(brokers, listenerName, adminClientConfig)) {
admin =>
- TestUtils.createOffsetsTopicWithAdmin(admin, brokers)
+ TestUtils.createOffsetsTopicWithAdmin(admin, brokers,
controllerServers)
}
} else {
TestUtils.createOffsetsTopic(zkClient, servers)
@@ -178,6 +178,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
admin = admin,
topic = topic,
brokers = brokers,
+ controllers = controllerServers,
numPartitions = numPartitions,
replicationFactor = replicationFactor,
topicConfig = topicConfig
@@ -211,7 +212,8 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
admin = admin,
topic = topic,
replicaAssignment = partitionReplicaAssignment,
- brokers = brokers
+ brokers = brokers,
+ controllers = controllerServers
)
}
} else {
@@ -232,7 +234,8 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
TestUtils.deleteTopicWithAdmin(
admin = admin,
topic = topic,
- brokers = aliveBrokers)
+ brokers = aliveBrokers,
+ controllers = controllerServers)
}
} else {
adminZkClient.deleteTopic(topic)
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 89cddfe97e7..b802a2ad627 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -74,7 +74,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
- brokers =
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala
+ brokers =
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+ controllers = raftCluster.controllerServers().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e4c831deebe..772ac6e5a01 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -455,6 +455,7 @@ object TestUtils extends Logging {
admin: Admin,
topic: String,
brokers: Seq[B],
+ controllers: Seq[ControllerServer],
numPartitions: Int = 1,
replicationFactor: Int = 1,
replicaAssignment: collection.Map[Int, Seq[Int]] = Map.empty,
@@ -492,6 +493,7 @@ object TestUtils extends Logging {
// wait until we've propagated all partitions metadata to all brokers
val allPartitionsMetadata = waitForAllPartitionsMetadata(brokers, topic,
effectiveNumPartitions)
+ controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers,
controller))
(0 until effectiveNumPartitions).map { i =>
i -> allPartitionsMetadata.get(new TopicPartition(topic,
i)).map(_.leader()).getOrElse(
@@ -521,7 +523,8 @@ object TestUtils extends Logging {
def createOffsetsTopicWithAdmin[B <: KafkaBroker](
admin: Admin,
- brokers: Seq[B]
+ brokers: Seq[B],
+ controllers: Seq[ControllerServer]
): Map[Int, Int] = {
val broker = brokers.head
createTopicWithAdmin(
@@ -530,6 +533,7 @@ object TestUtils extends Logging {
numPartitions =
broker.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
replicationFactor =
broker.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
brokers = brokers,
+ controllers = controllers,
topicConfig = broker.groupCoordinator.groupMetadataTopicConfigs,
)
}
@@ -538,6 +542,7 @@ object TestUtils extends Logging {
admin: Admin,
topic: String,
brokers: Seq[B],
+ controllers: Seq[ControllerServer]
): Unit = {
try {
admin.deleteTopics(Collections.singletonList(topic)).all().get()
@@ -547,6 +552,7 @@ object TestUtils extends Logging {
// ignore
}
waitForAllPartitionsMetadata(brokers, topic, 0)
+ controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers,
controller))
}
/**
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index 59acae74ad3..8d475fbfe3c 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -179,7 +179,7 @@ public final class TieredStorageTestContext implements
AutoCloseable {
}
public void deleteTopic(String topic) {
- TestUtils.deleteTopicWithAdmin(admin, topic, harness.brokers());
+ TestUtils.deleteTopicWithAdmin(admin, topic, harness.brokers(),
harness.controllerServers());
}
/**