This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5df4d58a32 KAFKA-19755 Move KRaftClusterTest from core module to
server module [4/4] (#21230)
d5df4d58a32 is described below
commit d5df4d58a32399711efdde2d134fe49489d8b6c3
Author: Lan Ding <[email protected]>
AuthorDate: Fri Jan 9 21:53:59 2026 +0800
KAFKA-19755 Move KRaftClusterTest from core module to server module [4/4]
(#21230)
Move KRaftClusterTest from core module to server module.
Rewrite
- testCreateClusterAndRestartControllerNode
- testSnapshotCount
- testSingleControllerSingleBrokerCluster
- testOverlyLargeCreateTopics
- testTimedOutHeartbeats
- testRegisteredControllerEndpoints
- testDirectToControllerCommunicationFailsOnOlderMetadataVersion
- testStartupWithNonDefaultKControllerDynamicConfiguration
- testTopicDeletedAndRecreatedWhileBrokerIsDown
- testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir
- testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir
- testControllerFailover
- testOldBootstrapMetadataFile
- testIncreaseNumIoThreads
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control-server.xml | 1 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 5 +
.../kafka/server/KRaftClusterTest.scala | 583 ---------------------
.../apache/kafka/controller/QuorumController.java | 2 +-
.../org/apache/kafka/server/KRaftClusterTest.java | 515 ++++++++++++++++++
5 files changed, 522 insertions(+), 584 deletions(-)
diff --git a/checkstyle/import-control-server.xml
b/checkstyle/import-control-server.xml
index 5c99f2539aa..4b087d70e36 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -68,6 +68,7 @@
<!-- utilities for testing -->
<allow pkg="org.apache.kafka.test" />
+ <allow class="org.apache.kafka.controller.QuorumController" />
<!-- persistent collection factories/non-library-specific wrappers -->
<allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c469f53bdda..706a359afed 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -166,6 +166,11 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
this.currentConfig = newConfig
}
+ // for testing
+ private[server] def updateCurrentConfig(props: util.Map[_, _]): Unit = {
+ this.currentConfig = new KafkaConfig(props)
+ }
+
override def originals: util.Map[String, AnyRef] =
if (this eq currentConfig) super.originals else currentConfig.originals
override def values: util.Map[String, _] =
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
deleted file mode 100644
index 24468d2395f..00000000000
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ /dev/null
@@ -1,583 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin._
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.config.ConfigResource.Type
-import org.apache.kafka.common.errors.{PolicyViolationException,
UnsupportedVersionException}
-import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.controller.{QuorumController,
QuorumControllerIntegrationTestUtils}
-import org.apache.kafka.metadata.BrokerState
-import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
-import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.raft.KRaftConfigs
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.config.ServerConfigs
-import org.apache.kafka.storage.internals.log.UnifiedLog
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{Tag, Test, Timeout}
-
-import java.io.File
-import java.nio.charset.StandardCharsets
-import java.nio.file.{FileSystems, Files, Path, Paths}
-import java.util
-import java.util.concurrent.{ExecutionException, TimeUnit}
-import scala.collection.{Seq, mutable}
-import scala.jdk.CollectionConverters._
-import scala.util.Using
-
-@Timeout(120)
-@Tag("integration")
-class KRaftClusterTest {
- @Test
- def testCreateClusterAndRestartControllerNode(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(3).build()).build()
- try {
- cluster.format()
- cluster.startup()
- val controller =
cluster.controllers().values().iterator().asScala.filter(_.controller.isActive).next()
- val port =
controller.socketServer.boundPort(ListenerName.normalised(controller.config.controllerListeners.head.listener))
-
- // shutdown active controller
- controller.shutdown()
- // Rewrite The `listeners` config to avoid controller socket server init
using different port
- val config = controller.sharedServer.controllerConfig.props
-
config.asInstanceOf[util.HashMap[String,String]].put(SocketServerConfigs.LISTENERS_CONFIG,
s"CONTROLLER://localhost:$port")
- controller.sharedServer.controllerConfig.updateCurrentConfig(new
KafkaConfig(config))
-
- // restart controller
- controller.startup()
- TestUtils.waitUntilTrue(() =>
cluster.controllers().values().iterator().asScala.exists(_.controller.isActive),
"Timeout waiting for new controller election")
- } finally {
- cluster.close()
- }
- }
-
- private def waitForTopicListing(admin: Admin,
- expectedPresent: Seq[String],
- expectedAbsent: Seq[String]): Unit = {
- val topicsNotFound = new util.HashSet[String]
- var extraTopics: mutable.Set[String] = null
- expectedPresent.foreach(topicsNotFound.add)
- TestUtils.waitUntilTrue(() => {
- admin.listTopics().names().get().forEach(name =>
topicsNotFound.remove(name))
- extraTopics =
admin.listTopics().names().get().asScala.filter(expectedAbsent.contains(_))
- topicsNotFound.isEmpty && extraTopics.isEmpty
- }, s"Failed to find topic(s): ${topicsNotFound.asScala} and NOT find
topic(s): $extraTopics")
- }
-
- @Test
- def testSnapshotCount(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(0).
- setNumControllerNodes(1).build())
- .setConfigProp("metadata.log.max.snapshot.interval.ms", "500")
- .setConfigProp("metadata.max.idle.interval.ms", "50") // Set this low to
generate metadata
- .build()
-
- try {
- cluster.format()
- cluster.startup()
- def snapshotCounter(path: Path): Long = {
- path.toFile.listFiles((_: File, name: String) => {
- name.toLowerCase.endsWith("checkpoint")
- }).length
- }
-
- val metaLog =
FileSystems.getDefault.getPath(cluster.controllers().get(3000).config.metadataLogDir,
"__cluster_metadata-0")
- TestUtils.waitUntilTrue(() => { snapshotCounter(metaLog) > 0 }, "Failed
to see at least one snapshot")
- Thread.sleep(500 * 10) // Sleep for 10 snapshot intervals
- val countAfterTenIntervals = snapshotCounter(metaLog)
- assertTrue(countAfterTenIntervals > 1, s"Expected to see at least one
more snapshot, saw $countAfterTenIntervals")
- assertTrue(countAfterTenIntervals < 20, s"Did not expect to see more
than twice as many snapshots as snapshot intervals, saw
$countAfterTenIntervals")
- TestUtils.waitUntilTrue(() => {
- val emitterMetrics = cluster.controllers().values().iterator().next().
- sharedServer.snapshotEmitter.metrics()
- emitterMetrics.latestSnapshotGeneratedBytes() > 0
- }, "Failed to see latestSnapshotGeneratedBytes > 0")
- } finally {
- cluster.close()
- }
- }
-
- /**
- * Test a single broker, single controller cluster at the minimum bootstrap
level. This tests
- * that we can function without having periodic NoOpRecords written.
- */
- @Test
- def testSingleControllerSingleBrokerCluster(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setBootstrapMetadataVersion(MetadataVersion.MINIMUM_VERSION).
- setNumBrokerNodes(1).
- setNumControllerNodes(1).build()).build()
- try {
- cluster.format()
- cluster.startup()
- cluster.waitForReadyBrokers()
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testOverlyLargeCreateTopics(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(1).build()).build()
- try {
- cluster.format()
- cluster.startup()
- val admin = cluster.admin()
- try {
- val newTopics = new util.ArrayList[NewTopic]()
- for (i <- 0 to 10000) {
- newTopics.add(new NewTopic("foo" + i, 100000, 1.toShort))
- }
- val executionException = assertThrows(classOf[ExecutionException],
- () => admin.createTopics(newTopics).all().get())
- assertNotNull(executionException.getCause)
- assertEquals(classOf[PolicyViolationException],
executionException.getCause.getClass)
- assertEquals("Excessively large number of partitions per request.",
- executionException.getCause.getMessage)
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testTimedOutHeartbeats(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(3).
- setNumControllerNodes(1).build()).
- setConfigProp(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG,
10.toString).
- setConfigProp(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG,
1000.toString).
- build()
- try {
- cluster.format()
- cluster.startup()
- val controller = cluster.controllers().values().iterator().next()
- controller.controller.waitForReadyBrokers(3).get()
- TestUtils.retry(60000) {
- val latch =
QuorumControllerIntegrationTestUtils.pause(controller.controller.asInstanceOf[QuorumController])
- Thread.sleep(1001)
- latch.countDown()
- assertEquals(0,
controller.sharedServer.controllerServerMetrics.fencedBrokerCount())
- assertTrue(controller.quorumControllerMetrics.timedOutHeartbeats() > 0,
- "Expected timedOutHeartbeats to be greater than 0.")
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testRegisteredControllerEndpoints(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(3).build()).
- build()
- try {
- cluster.format()
- cluster.startup()
- TestUtils.retry(60000) {
- val controller = cluster.controllers().values().iterator().next()
- val registeredControllers =
controller.registrationsPublisher.controllers()
- assertEquals(3, registeredControllers.size(), "Expected 3 controller
registrations")
- registeredControllers.values().forEach(registration => {
- assertNotNull(registration.listeners.get("CONTROLLER"))
- assertNotEquals(0, registration.listeners.get("CONTROLLER").port())
- })
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testDirectToControllerCommunicationFailsOnOlderMetadataVersion(): Unit =
{
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2).
- setNumBrokerNodes(1).
- setNumControllerNodes(1).build()).
- build()
- try {
- cluster.format()
- cluster.startup()
- val admin = cluster.admin(util.Map.of(), true)
- try {
- val exception = assertThrows(classOf[ExecutionException],
- () => admin.describeCluster().clusterId().get(1, TimeUnit.MINUTES))
- assertNotNull(exception.getCause)
- assertEquals(classOf[UnsupportedVersionException],
exception.getCause.getClass)
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testStartupWithNonDefaultKControllerDynamicConfiguration(): Unit = {
- val bootstrapRecords = util.List.of(
- new ApiMessageAndVersion(new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel), 0.toShort),
- new ApiMessageAndVersion(new ConfigRecord().
- setResourceType(ConfigResource.Type.BROKER.id).
- setResourceName("").
- setName("num.io.threads").
- setValue("9"), 0.toShort))
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords,
"testRecords")).
- setNumBrokerNodes(1).
- setNumControllerNodes(1).build()).
- build()
- try {
- cluster.format()
- cluster.startup()
- val controller = cluster.controllers().values().iterator().next()
- TestUtils.retry(60000) {
- assertNotNull(controller.controllerApisHandlerPool)
- assertEquals(9,
controller.controllerApisHandlerPool.threadPoolSize.get())
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testTopicDeletedAndRecreatedWhileBrokerIsDown(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2).
- setNumBrokerNodes(3).
- setNumControllerNodes(1).build()).
- build()
- try {
- cluster.format()
- cluster.startup()
- val admin = cluster.admin()
- try {
- val broker0 = cluster.brokers().get(0)
- val broker1 = cluster.brokers().get(1)
- val foo0 = new TopicPartition("foo", 0)
-
- admin.createTopics(util.List.of(
- new NewTopic("foo", 3, 3.toShort))).all().get()
-
- // Wait until foo-0 is created on broker0.
- TestUtils.retry(60000) {
- assertTrue(broker0.logManager.getLog(foo0).isDefined)
- }
-
- // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
- broker0.shutdown()
- TestUtils.retry(60000) {
- val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
- assertTrue(info.isPresent)
- assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
- }
-
- // Modify foo-0 so that it has the wrong topic ID.
- val logDir = broker0.logManager.getLog(foo0).get.dir
- val partitionMetadataFile = new File(logDir, "partition.metadata")
- Files.write(partitionMetadataFile.toPath,
- "version: 0\ntopic_id:
AAAAAAAAAAAAA7SrBWaJ7g\n".getBytes(StandardCharsets.UTF_8))
-
- // Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2]
- broker0.startup()
- TestUtils.retry(60000) {
- val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
- assertTrue(info.isPresent)
- assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet)
- }
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir(): Unit =
{
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
- setNumBrokerNodes(3).
- setNumDisksPerBroker(2).
- setNumControllerNodes(1).build()).
- build()
- try {
- cluster.format()
- cluster.startup()
- val admin = cluster.admin()
- try {
- val broker0 = cluster.brokers().get(0)
- val broker1 = cluster.brokers().get(1)
- val foo0 = new TopicPartition("foo", 0)
-
- admin.createTopics(util.List.of(
- new NewTopic("foo", 3, 3.toShort))).all().get()
-
- // Wait until foo-0 is created on broker0.
- TestUtils.retry(60000) {
- assertTrue(broker0.logManager.getLog(foo0).isDefined)
- }
-
- // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
- broker0.shutdown()
- TestUtils.retry(60000) {
- val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
- assertTrue(info.isPresent)
- assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
- }
-
- // Modify foo-0 so that it refers to a future replica.
- // This is equivalent to a failure during the promotion of the future
replica and a restart with directory for
- // the main replica being offline
- val log = broker0.logManager.getLog(foo0).get
- log.renameDir(UnifiedLog.logFutureDirName(foo0), false)
-
- // Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2]
- broker0.startup()
- TestUtils.retry(60000) {
- val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
- assertTrue(info.isPresent)
- assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet)
- assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty)
- }
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- def copyDirectory(src: String, dest: String): Unit = {
- Files.walk(Paths.get(src)).forEach(p => {
- val out = Paths.get(dest, p.toString.substring(src.length()))
- if (!p.toString.equals(src)) {
- Files.copy(p, out)
- }
- })
- }
-
- @Test
- def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
- setNumBrokerNodes(3).
- setNumDisksPerBroker(2).
- setNumControllerNodes(1).build()).
- build()
- try {
- cluster.format()
- cluster.startup()
- val admin = cluster.admin()
- try {
- val broker0 = cluster.brokers().get(0)
- val broker1 = cluster.brokers().get(1)
- val foo0 = new TopicPartition("foo", 0)
-
- admin.createTopics(util.List.of(
- new NewTopic("foo", 3, 3.toShort))).all().get()
-
- // Wait until foo-0 is created on broker0.
- TestUtils.retry(60000) {
- assertTrue(broker0.logManager.getLog(foo0).isDefined)
- }
-
- // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
- broker0.shutdown()
- TestUtils.retry(60000) {
- val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
- assertTrue(info.isPresent)
- assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
- }
-
- val log = broker0.logManager.getLog(foo0).get
-
- // Copy foo-0 to targetParentDir
- // This is so that we can rename the main replica to a future down
below
- val parentDir = log.parentDir
- val targetParentDir = broker0.config.logDirs.stream().filter(l =>
!l.equals(parentDir)).findFirst().get()
- val targetDirFile = new File(targetParentDir, log.dir.getName)
- targetDirFile.mkdir()
- copyDirectory(log.dir.toString, targetDirFile.toString)
- assertTrue(targetDirFile.exists())
-
- // Rename original log to a future
- // This is equivalent to a failure during the promotion of the future
replica and a restart with directory for
- // the main replica being online
- val originalLogFile = log.dir
- log.renameDir(UnifiedLog.logFutureDirName(foo0), false)
- assertFalse(originalLogFile.exists())
-
- // Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2]
- broker0.startup()
- TestUtils.retry(60000) {
- val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
- assertTrue(info.isPresent)
- assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet)
- assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty)
- assertFalse(targetDirFile.exists())
- assertTrue(originalLogFile.exists())
- }
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testControllerFailover(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(5).build()).build()
- try {
- cluster.format()
- cluster.startup()
- cluster.waitForReadyBrokers()
- TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState ==
BrokerState.RUNNING,
- "Broker never made it to RUNNING state.")
- TestUtils.waitUntilTrue(() =>
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
- "RaftManager was not initialized.")
-
- val admin = cluster.admin()
- try {
- // Create a test topic
- admin.createTopics(util.List.of(
- new NewTopic("test-topic", 1, 1.toShort))).all().get()
- waitForTopicListing(admin, Seq("test-topic"), Seq())
-
- // Shut down active controller
- val active = cluster.waitForActiveController()
-
cluster.raftManagers().get(active.asInstanceOf[QuorumController].nodeId()).shutdown()
-
- // Create a test topic on the new active controller
- admin.createTopics(util.List.of(
- new NewTopic("test-topic2", 1, 1.toShort))).all().get()
- waitForTopicListing(admin, Seq("test-topic2"), Seq())
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- /**
- * Test that once a cluster is formatted, a bootstrap.metadata file that
contains an unsupported
- * MetadataVersion is not a problem. This is a regression test for
KAFKA-19192.
- */
- @Test
- def testOldBootstrapMetadataFile(): Unit = {
- val baseDirectory = TestUtils.tempDir().toPath()
- Using.resource(new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(1).
- setBaseDirectory(baseDirectory).
- build()).
- setDeleteOnClose(false).
- build()
- ) { cluster =>
- cluster.format()
- cluster.startup()
- cluster.waitForReadyBrokers()
- }
- val oldBootstrapMetadata = BootstrapMetadata.fromRecords(
- util.List.of(
- new ApiMessageAndVersion(
- new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(1),
- 0.toShort)
- ),
- "oldBootstrapMetadata")
- // Re-create the cluster using the same directory structure as above.
- // Since we do not need to use the bootstrap metadata, the fact that
- // it specifies an obsolete metadata.version should not be a problem.
- Using.resource(new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(1).
- setBaseDirectory(baseDirectory).
- setBootstrapMetadata(oldBootstrapMetadata).
- build()).build()
- ) { cluster =>
- cluster.startup()
- cluster.waitForReadyBrokers()
- }
- }
-
- @Test
- def testIncreaseNumIoThreads(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(1).build()).
- setConfigProp(ServerConfigs.NUM_IO_THREADS_CONFIG, "4").
- build()
- try {
- cluster.format()
- cluster.startup()
- cluster.waitForReadyBrokers()
- val admin = cluster.admin()
- try {
- admin.incrementalAlterConfigs(
- util.Map.of(new ConfigResource(Type.BROKER, ""),
- util.List.of(new AlterConfigOp(
- new ConfigEntry(ServerConfigs.NUM_IO_THREADS_CONFIG, "8"),
OpType.SET)))).all().get()
- val newTopic = util.List.of(new NewTopic("test-topic", 1, 1.toShort))
- val createTopicResult = admin.createTopics(newTopic)
- createTopicResult.all().get()
- waitForTopicListing(admin, Seq("test-topic"), Seq())
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index fc6e889ba3e..8ebd7848853 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -632,7 +632,7 @@ public final class QuorumController implements Controller {
}
}
- void appendControlEvent(String name, Runnable handler) {
+ public void appendControlEvent(String name, Runnable handler) {
ControllerEvent event = new ControllerEvent(name, handler);
queue.append(event);
}
diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
index 64eb54a2de4..f89e5089156 100644
--- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
+++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
@@ -47,7 +47,11 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.DescribeClusterRequestData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
@@ -60,10 +64,13 @@ import
org.apache.kafka.common.requests.DescribeClusterResponse;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.KRaftConfigs;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
@@ -71,12 +78,15 @@ import
org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
@@ -87,30 +97,43 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static org.apache.kafka.server.IntegrationTestUtils.connectAndReceive;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -1192,6 +1215,498 @@ public class KRaftClusterTest {
}
}
+ @Test
+ public void testCreateClusterAndRestartControllerNode() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().stream()
+ .filter(c -> c.controller().isActive())
+ .findFirst()
+ .get();
+ var port = controller.socketServer().boundPort(
+
ListenerName.normalised(controller.config().controllerListeners().head().listener()));
+
+ // shutdown active controller
+ controller.shutdown();
+ // Rewrite The `listeners` config to avoid controller socket
server init using different port
+ var config = controller.sharedServer().controllerConfig().props();
+ ((Map<String, String>)
config).put(SocketServerConfigs.LISTENERS_CONFIG,
+ "CONTROLLER://localhost:" + port);
+
controller.sharedServer().controllerConfig().updateCurrentConfig(config);
+
+ // restart controller
+ controller.startup();
+ TestUtils.waitForCondition(() ->
cluster.controllers().values().stream()
+ .anyMatch(c -> c.controller().isActive()),
+ "Timeout waiting for new controller election");
+ }
+ }
+
+ @Test
+ public void testSnapshotCount() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(0)
+ .setNumControllerNodes(1)
+ .build())
+ .setConfigProp("metadata.log.max.snapshot.interval.ms", "500")
+ .setConfigProp("metadata.max.idle.interval.ms", "50") // Set this
low to generate metadata
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var metaLog = FileSystems.getDefault().getPath(
+ cluster.controllers().get(3000).config().metadataLogDir(),
+ "__cluster_metadata-0");
+ TestUtils.waitForCondition(() -> {
+ var files = metaLog.toFile().listFiles((dir, name) ->
+ name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+ );
+ return files != null && files.length > 0;
+ }, "Failed to see at least one snapshot");
+ Thread.sleep(500 * 10); // Sleep for 10 snapshot intervals
+ var filesAfterTenIntervals = metaLog.toFile().listFiles((dir,
name) ->
+ name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+ );
+ int countAfterTenIntervals = filesAfterTenIntervals != null ?
filesAfterTenIntervals.length : 0;
+ assertTrue(countAfterTenIntervals > 1,
+ "Expected to see at least one more snapshot, saw " +
countAfterTenIntervals);
+ assertTrue(countAfterTenIntervals < 20,
+ "Did not expect to see more than twice as many snapshots as
snapshot intervals, saw " + countAfterTenIntervals);
+ TestUtils.waitForCondition(() -> {
+ var emitterMetrics =
cluster.controllers().values().iterator().next()
+ .sharedServer().snapshotEmitter().metrics();
+ return emitterMetrics.latestSnapshotGeneratedBytes() > 0;
+ }, "Failed to see latestSnapshotGeneratedBytes > 0");
+ }
+ }
+
+ /**
+ * Test a single broker, single controller cluster at the minimum
bootstrap level. This tests
+ * that we can function without having periodic NoOpRecords written.
+ */
+ @Test
+ public void testSingleControllerSingleBrokerCluster() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.MINIMUM_VERSION)
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ }
+ }
+
+ @Test
+ public void testOverlyLargeCreateTopics() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var newTopics = new ArrayList<NewTopic>();
+ for (int i = 0; i <= 10000; i++) {
+ newTopics.add(new NewTopic("foo" + i, 100000, (short) 1));
+ }
+ var executionException = assertThrows(ExecutionException.class,
+ () -> admin.createTopics(newTopics).all().get());
+ assertNotNull(executionException.getCause());
+ assertEquals(PolicyViolationException.class,
executionException.getCause().getClass());
+ assertEquals("Excessively large number of partitions per
request.",
+ executionException.getCause().getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testTimedOutHeartbeats() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(1)
+ .build())
+ .setConfigProp(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG,
"10")
+ .setConfigProp(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG,
"1000")
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().iterator().next();
+ controller.controller().waitForReadyBrokers(3).get();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var latch = pause((QuorumController) controller.controller());
+ Thread.sleep(1001);
+ latch.countDown();
+ assertEquals(0,
controller.sharedServer().controllerServerMetrics().fencedBrokerCount());
+
assertTrue(controller.quorumControllerMetrics().timedOutHeartbeats() > 0,
+ "Expected timedOutHeartbeats to be greater than 0.");
+ });
+ }
+ }
+
+ // Duplicate method to decouple the dependency on the metadata module.
+ private CountDownLatch pause(QuorumController controller) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ controller.appendControlEvent("pause", () -> {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while waiting for unpause.", e);
+ }
+ });
+ return latch;
+ }
+
+ @Test
+ public void testRegisteredControllerEndpoints() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var controller =
cluster.controllers().values().iterator().next();
+ var registeredControllers =
controller.registrationsPublisher().controllers();
+ assertEquals(3, registeredControllers.size(), "Expected 3
controller registrations");
+ registeredControllers.values().forEach(registration -> {
+ assertNotNull(registration.listeners().get("CONTROLLER"));
+ assertNotEquals(0,
registration.listeners().get("CONTROLLER").port());
+ });
+ });
+ }
+ }
+
+ @Test
+ public void
testDirectToControllerCommunicationFailsOnOlderMetadataVersion() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2)
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin(Map.of(), true)) {
+ var exception = assertThrows(ExecutionException.class,
+ () -> admin.describeCluster().clusterId().get(1,
TimeUnit.MINUTES));
+ assertNotNull(exception.getCause());
+ assertEquals(UnsupportedVersionException.class,
exception.getCause().getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testStartupWithNonDefaultKControllerDynamicConfiguration()
throws Exception {
+ var bootstrapRecords = List.of(
+ new ApiMessageAndVersion(new FeatureLevelRecord()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()),
(short) 0),
+ new ApiMessageAndVersion(new ConfigRecord()
+ .setResourceType(ConfigResource.Type.BROKER.id())
+ .setResourceName("")
+ .setName("num.io.threads")
+ .setValue("9"), (short) 0));
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new
TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords,
"testRecords"))
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().iterator().next();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ assertNotNull(controller.controllerApisHandlerPool());
+ assertEquals(9,
controller.controllerApisHandlerPool().threadPoolSize().get());
+ });
+ }
+ }
+
+ @Test
+ public void testTopicDeletedAndRecreatedWhileBrokerIsDown() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2)
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var broker0 = cluster.brokers().get(0);
+ var broker1 = cluster.brokers().get(1);
+ var foo0 = new TopicPartition("foo", 0);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 3, (short) 3))).all().get();
+
+ // Wait until foo-0 is created on broker0.
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ assertTrue(broker0.logManager().getLog(foo0,
false).isDefined());
+ });
+
+ // Shut down broker0 and wait until the ISR of foo-0 is set to
[1, 2]
+ broker0.shutdown();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(1, 2), new
HashSet<>(info.get().isr()));
+ });
+
+ // Modify foo-0 so that it has the wrong topic ID.
+ var logDir = broker0.logManager().getLog(foo0,
false).get().dir();
+ var partitionMetadataFile = new File(logDir,
"partition.metadata");
+ Files.write(partitionMetadataFile.toPath(),
+ "version: 0\ntopic_id:
AAAAAAAAAAAAA7SrBWaJ7g\n".getBytes(StandardCharsets.UTF_8));
+
+ // Start up broker0 and wait until the ISR of foo-0 is set to
[0, 1, 2]
+ broker0.startup();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(0, 1, 2), new
HashSet<>(info.get().isr()));
+ });
+ }
+ }
+ }
+
+ @Test
+ public void
testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2)
+ .setNumBrokerNodes(3)
+ .setNumDisksPerBroker(2)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var broker0 = cluster.brokers().get(0);
+ var broker1 = cluster.brokers().get(1);
+ var foo0 = new TopicPartition("foo", 0);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 3, (short) 3))).all().get();
+
+ // Wait until foo-0 is created on broker0.
+ TestUtils.retryOnExceptionWithTimeout(60000, () ->
+ assertTrue(broker0.logManager().getLog(foo0,
false).isDefined()));
+
+ // Shut down broker0 and wait until the ISR of foo-0 is set to
[1, 2]
+ broker0.shutdown();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(1, 2), new
HashSet<>(info.get().isr()));
+ });
+
+ // Modify foo-0 so that it refers to a future replica.
+ // This is equivalent to a failure during the promotion of the
future replica and a restart with directory for
+ // the main replica being offline
+ var log = broker0.logManager().getLog(foo0, false).get();
+ log.renameDir(UnifiedLog.logFutureDirName(foo0), false);
+
+ // Start up broker0 and wait until the ISR of foo-0 is set to
[0, 1, 2]
+ broker0.startup();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(0, 1, 2), new
HashSet<>(info.get().isr()));
+ assertTrue(broker0.logManager().getLog(foo0,
true).isEmpty());
+ });
+ }
+ }
+ }
+
+ @Test
+ public void
testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2)
+ .setNumBrokerNodes(3)
+ .setNumDisksPerBroker(2)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var broker0 = cluster.brokers().get(0);
+ var broker1 = cluster.brokers().get(1);
+ var foo0 = new TopicPartition("foo", 0);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 3, (short) 3))).all().get();
+
+ // Wait until foo-0 is created on broker0.
+ TestUtils.retryOnExceptionWithTimeout(60000, () ->
+ assertTrue(broker0.logManager().getLog(foo0,
false).isDefined()));
+
+ // Shut down broker0 and wait until the ISR of foo-0 is set to
[1, 2]
+ broker0.shutdown();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(1, 2), new
HashSet<>(info.get().isr()));
+ });
+
+ var log = broker0.logManager().getLog(foo0, false).get();
+
+ // Copy foo-0 to targetParentDir
+ // This is so that we can rename the main replica to a future
down below
+ var parentDir = log.parentDir();
+ var targetParentDir = broker0.config().logDirs().stream()
+ .filter(l -> !l.equals(parentDir))
+ .findFirst()
+ .orElseThrow();
+ var targetDirFile = new File(targetParentDir,
log.dir().getName());
+ targetDirFile.mkdir();
+ try (Stream<Path> stream =
Files.walk(Paths.get(log.dir().toString()))) {
+ stream.forEach(p -> {
+ var out = Paths.get(targetDirFile.toString(),
+
p.toString().substring(log.dir().toString().length()));
+ if (!p.toString().equals(log.dir().toString())) {
+ assertDoesNotThrow(() -> Files.copy(p, out));
+ }
+ });
+ }
+ assertTrue(targetDirFile.exists());
+
+ // Rename original log to a future
+ // This is equivalent to a failure during the promotion of the
future replica and a restart with directory for
+ // the main replica being online
+ var originalLogFile = log.dir();
+ log.renameDir(UnifiedLog.logFutureDirName(foo0), false);
+ assertFalse(originalLogFile.exists());
+
+ // Start up broker0 and wait until the ISR of foo-0 is set to
[0, 1, 2]
+ broker0.startup();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(0, 1, 2), new
HashSet<>(info.get().isr()));
+ assertTrue(broker0.logManager().getLog(foo0,
true).isEmpty());
+ assertFalse(targetDirFile.exists());
+ assertTrue(originalLogFile.exists());
+ });
+ }
+ }
+ }
+
+ @Test
+ public void testControllerFailover() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3).build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ try (Admin admin = cluster.admin()) {
+ // Create a test topic
+ admin.createTopics(List.of(
+ new NewTopic("test-topic", 1, (short) 1))).all().get();
+ waitForTopicListing(admin, List.of("test-topic"), List.of());
+
+ // Shut down active controller
+ var active = cluster.waitForActiveController();
+ cluster.raftManagers().get(((QuorumController)
active).nodeId()).shutdown();
+
+ // Create a test topic on the new active controller
+ admin.createTopics(List.of(
+ new NewTopic("test-topic2", 1, (short) 1))).all().get();
+ waitForTopicListing(admin, List.of("test-topic2"), List.of());
+ }
+ }
+ }
+
+ /**
+ * Test that once a cluster is formatted, a bootstrap.metadata file that
contains an unsupported
+ * MetadataVersion is not a problem. This is a regression test for
KAFKA-19192.
+ */
+ @Test
+ public void testOldBootstrapMetadataFile() throws Exception {
+ var baseDirectory = TestUtils.tempDirectory().toPath();
+ try (var cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .setBaseDirectory(baseDirectory)
+ .build())
+ .setDeleteOnClose(false)
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ }
+ var oldBootstrapMetadata = BootstrapMetadata.fromRecords(
+ List.of(
+ new ApiMessageAndVersion(
+ new FeatureLevelRecord()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel((short) 1),
+ (short) 0)
+ ),
+ "oldBootstrapMetadata");
+ // Re-create the cluster using the same directory structure as above.
+ // Since we do not need to use the bootstrap metadata, the fact that
+ // it specifies an obsolete metadata.version should not be a problem.
+ try (var cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .setBaseDirectory(baseDirectory)
+ .setBootstrapMetadata(oldBootstrapMetadata)
+ .build()).build()) {
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ }
+ }
+
+ @Test
+ public void testIncreaseNumIoThreads() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1).build())
+ .setConfigProp(ServerConfigs.NUM_IO_THREADS_CONFIG, "4")
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ try (Admin admin = cluster.admin()) {
+ admin.incrementalAlterConfigs(
+ Map.of(new ConfigResource(Type.BROKER, ""),
+ List.of(new AlterConfigOp(
+ new
ConfigEntry(ServerConfigs.NUM_IO_THREADS_CONFIG, "8"),
OpType.SET)))).all().get();
+ var newTopic = List.of(new NewTopic("test-topic", 1, (short)
1));
+ var createTopicResult = admin.createTopics(newTopic);
+ createTopicResult.all().get();
+ waitForTopicListing(admin, List.of("test-topic"), List.of());
+ }
+ }
+ }
+
public static class BadAuthorizer implements Authorizer {
// Default constructor needed for reflection object creation
public BadAuthorizer() {