This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5df4d58a32 KAFKA-19755 Move KRaftClusterTest from core module to 
server module [4/4] (#21230)
d5df4d58a32 is described below

commit d5df4d58a32399711efdde2d134fe49489d8b6c3
Author: Lan Ding <[email protected]>
AuthorDate: Fri Jan 9 21:53:59 2026 +0800

    KAFKA-19755 Move KRaftClusterTest from core module to server module [4/4] 
(#21230)
    
    Move KRaftClusterTest from core module to server module.
    Rewrite
    - testCreateClusterAndRestartControllerNode
    - testSnapshotCount
    - testSingleControllerSingleBrokerCluster
    - testOverlyLargeCreateTopics
    - testTimedOutHeartbeats
    - testRegisteredControllerEndpoints
    - testDirectToControllerCommunicationFailsOnOlderMetadataVersion
    - testStartupWithNonDefaultKControllerDynamicConfiguration
    - testTopicDeletedAndRecreatedWhileBrokerIsDown
    - testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir
    - testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir
    - testControllerFailover
    - testOldBootstrapMetadataFile
    - testIncreaseNumIoThreads
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/import-control-server.xml               |   1 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |   5 +
 .../kafka/server/KRaftClusterTest.scala            | 583 ---------------------
 .../apache/kafka/controller/QuorumController.java  |   2 +-
 .../org/apache/kafka/server/KRaftClusterTest.java  | 515 ++++++++++++++++++
 5 files changed, 522 insertions(+), 584 deletions(-)

diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index 5c99f2539aa..4b087d70e36 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -68,6 +68,7 @@
 
   <!-- utilities for testing -->
   <allow pkg="org.apache.kafka.test" />
+  <allow class="org.apache.kafka.controller.QuorumController" />
 
   <!-- persistent collection factories/non-library-specific wrappers -->
   <allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c469f53bdda..706a359afed 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -166,6 +166,11 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     this.currentConfig = newConfig
   }
 
+  // for testing
+  private[server] def updateCurrentConfig(props: util.Map[_, _]): Unit = {
+    this.currentConfig = new KafkaConfig(props)
+  }
+
   override def originals: util.Map[String, AnyRef] =
     if (this eq currentConfig) super.originals else currentConfig.originals
   override def values: util.Map[String, _] =
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
deleted file mode 100644
index 24468d2395f..00000000000
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ /dev/null
@@ -1,583 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin._
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.config.ConfigResource.Type
-import org.apache.kafka.common.errors.{PolicyViolationException, 
UnsupportedVersionException}
-import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.controller.{QuorumController, 
QuorumControllerIntegrationTestUtils}
-import org.apache.kafka.metadata.BrokerState
-import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
-import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.raft.KRaftConfigs
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.config.ServerConfigs
-import org.apache.kafka.storage.internals.log.UnifiedLog
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{Tag, Test, Timeout}
-
-import java.io.File
-import java.nio.charset.StandardCharsets
-import java.nio.file.{FileSystems, Files, Path, Paths}
-import java.util
-import java.util.concurrent.{ExecutionException, TimeUnit}
-import scala.collection.{Seq, mutable}
-import scala.jdk.CollectionConverters._
-import scala.util.Using
-
-@Timeout(120)
-@Tag("integration")
-class KRaftClusterTest {
-  @Test
-  def testCreateClusterAndRestartControllerNode(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(1).
-        setNumControllerNodes(3).build()).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      val controller = 
cluster.controllers().values().iterator().asScala.filter(_.controller.isActive).next()
-      val port = 
controller.socketServer.boundPort(ListenerName.normalised(controller.config.controllerListeners.head.listener))
-
-      // shutdown active controller
-      controller.shutdown()
-      // Rewrite The `listeners` config to avoid controller socket server init 
using different port
-      val config = controller.sharedServer.controllerConfig.props
-      
config.asInstanceOf[util.HashMap[String,String]].put(SocketServerConfigs.LISTENERS_CONFIG,
 s"CONTROLLER://localhost:$port")
-      controller.sharedServer.controllerConfig.updateCurrentConfig(new 
KafkaConfig(config))
-
-      // restart controller
-      controller.startup()
-      TestUtils.waitUntilTrue(() => 
cluster.controllers().values().iterator().asScala.exists(_.controller.isActive),
 "Timeout waiting for new controller election")
-    } finally {
-      cluster.close()
-    }
-  }
-
-  private def waitForTopicListing(admin: Admin,
-                                  expectedPresent: Seq[String],
-                                  expectedAbsent: Seq[String]): Unit = {
-    val topicsNotFound = new util.HashSet[String]
-    var extraTopics: mutable.Set[String] = null
-    expectedPresent.foreach(topicsNotFound.add)
-    TestUtils.waitUntilTrue(() => {
-      admin.listTopics().names().get().forEach(name => 
topicsNotFound.remove(name))
-      extraTopics = 
admin.listTopics().names().get().asScala.filter(expectedAbsent.contains(_))
-      topicsNotFound.isEmpty && extraTopics.isEmpty
-    }, s"Failed to find topic(s): ${topicsNotFound.asScala} and NOT find 
topic(s): $extraTopics")
-  }
-
-  @Test
-  def testSnapshotCount(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(0).
-        setNumControllerNodes(1).build())
-      .setConfigProp("metadata.log.max.snapshot.interval.ms", "500")
-      .setConfigProp("metadata.max.idle.interval.ms", "50") // Set this low to 
generate metadata
-      .build()
-
-    try {
-      cluster.format()
-      cluster.startup()
-      def snapshotCounter(path: Path): Long = {
-       path.toFile.listFiles((_: File, name: String) => {
-          name.toLowerCase.endsWith("checkpoint")
-        }).length
-      }
-
-      val metaLog = 
FileSystems.getDefault.getPath(cluster.controllers().get(3000).config.metadataLogDir,
 "__cluster_metadata-0")
-      TestUtils.waitUntilTrue(() => { snapshotCounter(metaLog) > 0 }, "Failed 
to see at least one snapshot")
-      Thread.sleep(500 * 10) // Sleep for 10 snapshot intervals
-      val countAfterTenIntervals = snapshotCounter(metaLog)
-      assertTrue(countAfterTenIntervals > 1, s"Expected to see at least one 
more snapshot, saw $countAfterTenIntervals")
-      assertTrue(countAfterTenIntervals < 20, s"Did not expect to see more 
than twice as many snapshots as snapshot intervals, saw 
$countAfterTenIntervals")
-      TestUtils.waitUntilTrue(() => {
-        val emitterMetrics = cluster.controllers().values().iterator().next().
-          sharedServer.snapshotEmitter.metrics()
-        emitterMetrics.latestSnapshotGeneratedBytes() > 0
-      }, "Failed to see latestSnapshotGeneratedBytes > 0")
-    } finally {
-      cluster.close()
-    }
-  }
-
-  /**
-   * Test a single broker, single controller cluster at the minimum bootstrap 
level. This tests
-   * that we can function without having periodic NoOpRecords written.
-   */
-  @Test
-  def testSingleControllerSingleBrokerCluster(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setBootstrapMetadataVersion(MetadataVersion.MINIMUM_VERSION).
-        setNumBrokerNodes(1).
-        setNumControllerNodes(1).build()).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      cluster.waitForReadyBrokers()
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testOverlyLargeCreateTopics(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(1).
-        setNumControllerNodes(1).build()).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      val admin = cluster.admin()
-      try {
-        val newTopics = new util.ArrayList[NewTopic]()
-        for (i <- 0 to 10000) {
-          newTopics.add(new NewTopic("foo" + i, 100000, 1.toShort))
-        }
-        val executionException = assertThrows(classOf[ExecutionException],
-            () => admin.createTopics(newTopics).all().get())
-        assertNotNull(executionException.getCause)
-        assertEquals(classOf[PolicyViolationException], 
executionException.getCause.getClass)
-        assertEquals("Excessively large number of partitions per request.",
-          executionException.getCause.getMessage)
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testTimedOutHeartbeats(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(3).
-        setNumControllerNodes(1).build()).
-      setConfigProp(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, 
10.toString).
-      setConfigProp(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG, 
1000.toString).
-      build()
-    try {
-      cluster.format()
-      cluster.startup()
-      val controller = cluster.controllers().values().iterator().next()
-      controller.controller.waitForReadyBrokers(3).get()
-      TestUtils.retry(60000) {
-        val latch = 
QuorumControllerIntegrationTestUtils.pause(controller.controller.asInstanceOf[QuorumController])
-        Thread.sleep(1001)
-        latch.countDown()
-        assertEquals(0, 
controller.sharedServer.controllerServerMetrics.fencedBrokerCount())
-        assertTrue(controller.quorumControllerMetrics.timedOutHeartbeats() > 0,
-          "Expected timedOutHeartbeats to be greater than 0.")
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testRegisteredControllerEndpoints(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(1).
-        setNumControllerNodes(3).build()).
-      build()
-    try {
-      cluster.format()
-      cluster.startup()
-      TestUtils.retry(60000) {
-        val controller = cluster.controllers().values().iterator().next()
-        val registeredControllers = 
controller.registrationsPublisher.controllers()
-        assertEquals(3, registeredControllers.size(), "Expected 3 controller 
registrations")
-        registeredControllers.values().forEach(registration => {
-          assertNotNull(registration.listeners.get("CONTROLLER"))
-          assertNotEquals(0, registration.listeners.get("CONTROLLER").port())
-        })
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testDirectToControllerCommunicationFailsOnOlderMetadataVersion(): Unit = 
{
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2).
-        setNumBrokerNodes(1).
-        setNumControllerNodes(1).build()).
-      build()
-    try {
-      cluster.format()
-      cluster.startup()
-      val admin = cluster.admin(util.Map.of(), true)
-      try {
-        val exception = assertThrows(classOf[ExecutionException],
-          () => admin.describeCluster().clusterId().get(1, TimeUnit.MINUTES))
-        assertNotNull(exception.getCause)
-        assertEquals(classOf[UnsupportedVersionException], 
exception.getCause.getClass)
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testStartupWithNonDefaultKControllerDynamicConfiguration(): Unit = {
-    val bootstrapRecords = util.List.of(
-      new ApiMessageAndVersion(new FeatureLevelRecord().
-        setName(MetadataVersion.FEATURE_NAME).
-        setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel), 0.toShort),
-      new ApiMessageAndVersion(new ConfigRecord().
-        setResourceType(ConfigResource.Type.BROKER.id).
-        setResourceName("").
-        setName("num.io.threads").
-        setValue("9"), 0.toShort))
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords, 
"testRecords")).
-        setNumBrokerNodes(1).
-        setNumControllerNodes(1).build()).
-      build()
-    try {
-      cluster.format()
-      cluster.startup()
-      val controller = cluster.controllers().values().iterator().next()
-      TestUtils.retry(60000) {
-        assertNotNull(controller.controllerApisHandlerPool)
-        assertEquals(9, 
controller.controllerApisHandlerPool.threadPoolSize.get())
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testTopicDeletedAndRecreatedWhileBrokerIsDown(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2).
-        setNumBrokerNodes(3).
-        setNumControllerNodes(1).build()).
-      build()
-    try {
-      cluster.format()
-      cluster.startup()
-      val admin = cluster.admin()
-      try {
-        val broker0 = cluster.brokers().get(0)
-        val broker1 = cluster.brokers().get(1)
-        val foo0 = new TopicPartition("foo", 0)
-
-        admin.createTopics(util.List.of(
-          new NewTopic("foo", 3, 3.toShort))).all().get()
-
-        // Wait until foo-0 is created on broker0.
-        TestUtils.retry(60000) {
-          assertTrue(broker0.logManager.getLog(foo0).isDefined)
-        }
-
-        // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
-        broker0.shutdown()
-        TestUtils.retry(60000) {
-          val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
-          assertTrue(info.isPresent)
-          assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
-        }
-
-        // Modify foo-0 so that it has the wrong topic ID.
-        val logDir = broker0.logManager.getLog(foo0).get.dir
-        val partitionMetadataFile = new File(logDir, "partition.metadata")
-        Files.write(partitionMetadataFile.toPath,
-          "version: 0\ntopic_id: 
AAAAAAAAAAAAA7SrBWaJ7g\n".getBytes(StandardCharsets.UTF_8))
-
-        // Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2]
-        broker0.startup()
-        TestUtils.retry(60000) {
-          val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
-          assertTrue(info.isPresent)
-          assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet)
-        }
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir(): Unit = 
{
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
-        setNumBrokerNodes(3).
-        setNumDisksPerBroker(2).
-        setNumControllerNodes(1).build()).
-      build()
-    try {
-      cluster.format()
-      cluster.startup()
-      val admin = cluster.admin()
-      try {
-        val broker0 = cluster.brokers().get(0)
-        val broker1 = cluster.brokers().get(1)
-        val foo0 = new TopicPartition("foo", 0)
-
-        admin.createTopics(util.List.of(
-          new NewTopic("foo", 3, 3.toShort))).all().get()
-
-        // Wait until foo-0 is created on broker0.
-        TestUtils.retry(60000) {
-          assertTrue(broker0.logManager.getLog(foo0).isDefined)
-        }
-
-        // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
-        broker0.shutdown()
-        TestUtils.retry(60000) {
-          val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
-          assertTrue(info.isPresent)
-          assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
-        }
-
-        // Modify foo-0 so that it refers to a future replica.
-        // This is equivalent to a failure during the promotion of the future 
replica and a restart with directory for
-        // the main replica being offline
-        val log = broker0.logManager.getLog(foo0).get
-        log.renameDir(UnifiedLog.logFutureDirName(foo0), false)
-
-        // Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2]
-        broker0.startup()
-        TestUtils.retry(60000) {
-          val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
-          assertTrue(info.isPresent)
-          assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet)
-          assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty)
-        }
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  def copyDirectory(src: String, dest: String): Unit = {
-    Files.walk(Paths.get(src)).forEach(p => {
-      val out = Paths.get(dest, p.toString.substring(src.length()))
-      if (!p.toString.equals(src)) {
-        Files.copy(p, out)
-      }
-    })
-  }
-
-  @Test
-  def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
-        setNumBrokerNodes(3).
-        setNumDisksPerBroker(2).
-        setNumControllerNodes(1).build()).
-      build()
-    try {
-      cluster.format()
-      cluster.startup()
-      val admin = cluster.admin()
-      try {
-        val broker0 = cluster.brokers().get(0)
-        val broker1 = cluster.brokers().get(1)
-        val foo0 = new TopicPartition("foo", 0)
-
-        admin.createTopics(util.List.of(
-          new NewTopic("foo", 3, 3.toShort))).all().get()
-
-        // Wait until foo-0 is created on broker0.
-        TestUtils.retry(60000) {
-          assertTrue(broker0.logManager.getLog(foo0).isDefined)
-        }
-
-        // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
-        broker0.shutdown()
-        TestUtils.retry(60000) {
-          val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
-          assertTrue(info.isPresent)
-          assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
-        }
-
-        val log = broker0.logManager.getLog(foo0).get
-
-        // Copy foo-0 to targetParentDir
-        // This is so that we can rename the main replica to a future down 
below
-        val parentDir = log.parentDir
-        val targetParentDir = broker0.config.logDirs.stream().filter(l => 
!l.equals(parentDir)).findFirst().get()
-        val targetDirFile = new File(targetParentDir, log.dir.getName)
-        targetDirFile.mkdir()
-        copyDirectory(log.dir.toString, targetDirFile.toString)
-        assertTrue(targetDirFile.exists())
-
-        // Rename original log to a future
-        // This is equivalent to a failure during the promotion of the future 
replica and a restart with directory for
-        // the main replica being online
-        val originalLogFile = log.dir
-        log.renameDir(UnifiedLog.logFutureDirName(foo0), false)
-        assertFalse(originalLogFile.exists())
-
-        // Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2]
-        broker0.startup()
-        TestUtils.retry(60000) {
-          val info = broker1.metadataCache.getLeaderAndIsr("foo", 0)
-          assertTrue(info.isPresent)
-          assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet)
-          assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty)
-          assertFalse(targetDirFile.exists())
-          assertTrue(originalLogFile.exists())
-        }
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testControllerFailover(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(1).
-        setNumControllerNodes(5).build()).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      cluster.waitForReadyBrokers()
-      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == 
BrokerState.RUNNING,
-        "Broker never made it to RUNNING state.")
-      TestUtils.waitUntilTrue(() => 
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
-        "RaftManager was not initialized.")
-
-      val admin = cluster.admin()
-      try {
-        // Create a test topic
-        admin.createTopics(util.List.of(
-          new NewTopic("test-topic", 1, 1.toShort))).all().get()
-        waitForTopicListing(admin, Seq("test-topic"), Seq())
-
-        // Shut down active controller
-        val active = cluster.waitForActiveController()
-        
cluster.raftManagers().get(active.asInstanceOf[QuorumController].nodeId()).shutdown()
-
-        // Create a test topic on the new active controller
-        admin.createTopics(util.List.of(
-          new NewTopic("test-topic2", 1, 1.toShort))).all().get()
-        waitForTopicListing(admin, Seq("test-topic2"), Seq())
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  /**
-   * Test that once a cluster is formatted, a bootstrap.metadata file that 
contains an unsupported
-   * MetadataVersion is not a problem. This is a regression test for 
KAFKA-19192.
-   */
-  @Test
-  def testOldBootstrapMetadataFile(): Unit = {
-    val baseDirectory = TestUtils.tempDir().toPath()
-    Using.resource(new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(1).
-        setNumControllerNodes(1).
-        setBaseDirectory(baseDirectory).
-          build()).
-      setDeleteOnClose(false).
-        build()
-    ) { cluster =>
-      cluster.format()
-      cluster.startup()
-      cluster.waitForReadyBrokers()
-    }
-    val oldBootstrapMetadata = BootstrapMetadata.fromRecords(
-      util.List.of(
-        new ApiMessageAndVersion(
-          new FeatureLevelRecord().
-            setName(MetadataVersion.FEATURE_NAME).
-            setFeatureLevel(1),
-          0.toShort)
-      ),
-      "oldBootstrapMetadata")
-    // Re-create the cluster using the same directory structure as above.
-    // Since we do not need to use the bootstrap metadata, the fact that
-    // it specifies an obsolete metadata.version should not be a problem.
-    Using.resource(new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(1).
-        setNumControllerNodes(1).
-        setBaseDirectory(baseDirectory).
-        setBootstrapMetadata(oldBootstrapMetadata).
-          build()).build()
-    ) { cluster =>
-      cluster.startup()
-      cluster.waitForReadyBrokers()
-    }
-  }
-
-  @Test
-  def testIncreaseNumIoThreads(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(1).
-        setNumControllerNodes(1).build()).
-      setConfigProp(ServerConfigs.NUM_IO_THREADS_CONFIG, "4").
-      build()
-    try {
-      cluster.format()
-      cluster.startup()
-      cluster.waitForReadyBrokers()
-      val admin = cluster.admin()
-      try {
-        admin.incrementalAlterConfigs(
-          util.Map.of(new ConfigResource(Type.BROKER, ""),
-            util.List.of(new AlterConfigOp(
-              new ConfigEntry(ServerConfigs.NUM_IO_THREADS_CONFIG, "8"), 
OpType.SET)))).all().get()
-        val newTopic = util.List.of(new NewTopic("test-topic", 1, 1.toShort))
-        val createTopicResult = admin.createTopics(newTopic)
-        createTopicResult.all().get()
-        waitForTopicListing(admin, Seq("test-topic"), Seq())
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-}
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index fc6e889ba3e..8ebd7848853 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -632,7 +632,7 @@ public final class QuorumController implements Controller {
         }
     }
 
-    void appendControlEvent(String name, Runnable handler) {
+    public void appendControlEvent(String name, Runnable handler) {
         ControllerEvent event = new ControllerEvent(name, handler);
         queue.append(event);
     }
diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java 
b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
index 64eb54a2de4..f89e5089156 100644
--- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
+++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
@@ -47,7 +47,11 @@ import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource.Type;
 import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.DescribeClusterRequestData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
@@ -60,10 +64,13 @@ import 
org.apache.kafka.common.requests.DescribeClusterResponse;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.test.KafkaClusterTestKit;
 import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.controller.QuorumController;
 import org.apache.kafka.image.ClusterImage;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.KRaftConfigs;
 import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.authorizer.Action;
@@ -71,12 +78,15 @@ import 
org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 import org.apache.kafka.server.authorizer.AuthorizationResult;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
 import org.apache.kafka.server.quota.ClientQuotaCallback;
 import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Tag;
@@ -87,30 +97,43 @@ import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.server.IntegrationTestUtils.connectAndReceive;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -1192,6 +1215,498 @@ public class KRaftClusterTest {
         }
     }
 
+    @Test
+    public void testCreateClusterAndRestartControllerNode() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            var controller = cluster.controllers().values().stream()
+                .filter(c -> c.controller().isActive())
+                .findFirst()
+                .get();
+            var port = controller.socketServer().boundPort(
+                
ListenerName.normalised(controller.config().controllerListeners().head().listener()));
+
+            // shutdown active controller
+            controller.shutdown();
+            // Rewrite The `listeners` config to avoid controller socket 
server init using different port
+            var config = controller.sharedServer().controllerConfig().props();
+            ((Map<String, String>) 
config).put(SocketServerConfigs.LISTENERS_CONFIG,
+                "CONTROLLER://localhost:" + port);
+            
controller.sharedServer().controllerConfig().updateCurrentConfig(config);
+
+            // restart controller
+            controller.startup();
+            TestUtils.waitForCondition(() -> 
cluster.controllers().values().stream()
+                .anyMatch(c -> c.controller().isActive()),
+                "Timeout waiting for new controller election");
+        }
+    }
+
+    @Test
+    public void testSnapshotCount() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(0)
+                .setNumControllerNodes(1)
+                .build())
+            .setConfigProp("metadata.log.max.snapshot.interval.ms", "500")
+            .setConfigProp("metadata.max.idle.interval.ms", "50") // Set this 
low to generate metadata
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            var metaLog = FileSystems.getDefault().getPath(
+                cluster.controllers().get(3000).config().metadataLogDir(),
+                "__cluster_metadata-0");
+            TestUtils.waitForCondition(() -> {
+                var files = metaLog.toFile().listFiles((dir, name) ->
+                    name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+                );
+                return files != null && files.length > 0;
+            }, "Failed to see at least one snapshot");
+            Thread.sleep(500 * 10); // Sleep for 10 snapshot intervals
+            var filesAfterTenIntervals = metaLog.toFile().listFiles((dir, 
name) ->
+                name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+            );
+            int countAfterTenIntervals = filesAfterTenIntervals != null ? 
filesAfterTenIntervals.length : 0;
+            assertTrue(countAfterTenIntervals > 1,
+                "Expected to see at least one more snapshot, saw " + 
countAfterTenIntervals);
+            assertTrue(countAfterTenIntervals < 20,
+                "Did not expect to see more than twice as many snapshots as 
snapshot intervals, saw " + countAfterTenIntervals);
+            TestUtils.waitForCondition(() -> {
+                var emitterMetrics = 
cluster.controllers().values().iterator().next()
+                    .sharedServer().snapshotEmitter().metrics();
+                return emitterMetrics.latestSnapshotGeneratedBytes() > 0;
+            }, "Failed to see latestSnapshotGeneratedBytes > 0");
+        }
+    }
+
+    /**
+     * Test a single broker, single controller cluster at the minimum 
bootstrap level. This tests
+     * that we can function without having periodic NoOpRecords written.
+     */
+    @Test
+    public void testSingleControllerSingleBrokerCluster() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setBootstrapMetadataVersion(MetadataVersion.MINIMUM_VERSION)
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+        }
+    }
+
+    @Test
+    public void testOverlyLargeCreateTopics() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = cluster.admin()) {
+                var newTopics = new ArrayList<NewTopic>();
+                for (int i = 0; i <= 10000; i++) {
+                    newTopics.add(new NewTopic("foo" + i, 100000, (short) 1));
+                }
+                var executionException = assertThrows(ExecutionException.class,
+                    () -> admin.createTopics(newTopics).all().get());
+                assertNotNull(executionException.getCause());
+                assertEquals(PolicyViolationException.class, 
executionException.getCause().getClass());
+                assertEquals("Excessively large number of partitions per 
request.",
+                    executionException.getCause().getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testTimedOutHeartbeats() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(1)
+                .build())
+            .setConfigProp(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, 
"10")
+            .setConfigProp(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG, 
"1000")
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            var controller = cluster.controllers().values().iterator().next();
+            controller.controller().waitForReadyBrokers(3).get();
+            TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                var latch = pause((QuorumController) controller.controller());
+                Thread.sleep(1001);
+                latch.countDown();
+                assertEquals(0, 
controller.sharedServer().controllerServerMetrics().fencedBrokerCount());
+                
assertTrue(controller.quorumControllerMetrics().timedOutHeartbeats() > 0,
+                    "Expected timedOutHeartbeats to be greater than 0.");
+            });
+        }
+    }
+
+    // Duplicate method to decouple the dependency on the metadata module.
+    private CountDownLatch pause(QuorumController controller) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        controller.appendControlEvent("pause", () -> {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                LOG.info("Interrupted while waiting for unpause.", e);
+            }
+        });
+        return latch;
+    }
+
+    @Test
+    public void testRegisteredControllerEndpoints() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(3)
+                .build())
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                var controller = 
cluster.controllers().values().iterator().next();
+                var registeredControllers = 
controller.registrationsPublisher().controllers();
+                assertEquals(3, registeredControllers.size(), "Expected 3 
controller registrations");
+                registeredControllers.values().forEach(registration -> {
+                    assertNotNull(registration.listeners().get("CONTROLLER"));
+                    assertNotEquals(0, 
registration.listeners().get("CONTROLLER").port());
+                });
+            });
+        }
+    }
+
+    @Test
+    public void 
testDirectToControllerCommunicationFailsOnOlderMetadataVersion() throws 
Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2)
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build())
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = cluster.admin(Map.of(), true)) {
+                var exception = assertThrows(ExecutionException.class,
+                    () -> admin.describeCluster().clusterId().get(1, 
TimeUnit.MINUTES));
+                assertNotNull(exception.getCause());
+                assertEquals(UnsupportedVersionException.class, 
exception.getCause().getClass());
+            }
+        }
+    }
+
+    @Test
+    public void testStartupWithNonDefaultKControllerDynamicConfiguration() 
throws Exception {
+        var bootstrapRecords = List.of(
+            new ApiMessageAndVersion(new FeatureLevelRecord()
+                .setName(MetadataVersion.FEATURE_NAME)
+                .setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()), 
(short) 0),
+            new ApiMessageAndVersion(new ConfigRecord()
+                .setResourceType(ConfigResource.Type.BROKER.id())
+                .setResourceName("")
+                .setName("num.io.threads")
+                .setValue("9"), (short) 0));
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new 
TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords, 
"testRecords"))
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build())
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            var controller = cluster.controllers().values().iterator().next();
+            TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                assertNotNull(controller.controllerApisHandlerPool());
+                assertEquals(9, 
controller.controllerApisHandlerPool().threadPoolSize().get());
+            });
+        }
+    }
+
+    @Test
+    public void testTopicDeletedAndRecreatedWhileBrokerIsDown() throws 
Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2)
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(1)
+                .build())
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = cluster.admin()) {
+                var broker0 = cluster.brokers().get(0);
+                var broker1 = cluster.brokers().get(1);
+                var foo0 = new TopicPartition("foo", 0);
+
+                admin.createTopics(List.of(
+                    new NewTopic("foo", 3, (short) 3))).all().get();
+
+                // Wait until foo-0 is created on broker0.
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                    assertTrue(broker0.logManager().getLog(foo0, 
false).isDefined());
+                });
+
+                // Shut down broker0 and wait until the ISR of foo-0 is set to 
[1, 2]
+                broker0.shutdown();
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                    var info = broker1.metadataCache().getLeaderAndIsr("foo", 
0);
+                    assertTrue(info.isPresent());
+                    assertEquals(Set.of(1, 2), new 
HashSet<>(info.get().isr()));
+                });
+
+                // Modify foo-0 so that it has the wrong topic ID.
+                var logDir = broker0.logManager().getLog(foo0, 
false).get().dir();
+                var partitionMetadataFile = new File(logDir, 
"partition.metadata");
+                Files.write(partitionMetadataFile.toPath(),
+                    "version: 0\ntopic_id: 
AAAAAAAAAAAAA7SrBWaJ7g\n".getBytes(StandardCharsets.UTF_8));
+
+                // Start up broker0 and wait until the ISR of foo-0 is set to 
[0, 1, 2]
+                broker0.startup();
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                    var info = broker1.metadataCache().getLeaderAndIsr("foo", 
0);
+                    assertTrue(info.isPresent());
+                    assertEquals(Set.of(0, 1, 2), new 
HashSet<>(info.get().isr()));
+                });
+            }
+        }
+    }
+
+    @Test
+    public void 
testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir() throws 
Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2)
+                .setNumBrokerNodes(3)
+                .setNumDisksPerBroker(2)
+                .setNumControllerNodes(1)
+                .build())
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = cluster.admin()) {
+                var broker0 = cluster.brokers().get(0);
+                var broker1 = cluster.brokers().get(1);
+                var foo0 = new TopicPartition("foo", 0);
+
+                admin.createTopics(List.of(
+                    new NewTopic("foo", 3, (short) 3))).all().get();
+
+                // Wait until foo-0 is created on broker0.
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> 
+                    assertTrue(broker0.logManager().getLog(foo0, 
false).isDefined()));
+
+                // Shut down broker0 and wait until the ISR of foo-0 is set to 
[1, 2]
+                broker0.shutdown();
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                    var info = broker1.metadataCache().getLeaderAndIsr("foo", 
0);
+                    assertTrue(info.isPresent());
+                    assertEquals(Set.of(1, 2), new 
HashSet<>(info.get().isr()));
+                });
+
+                // Modify foo-0 so that it refers to a future replica.
+                // This is equivalent to a failure during the promotion of the 
future replica and a restart with directory for
+                // the main replica being offline
+                var log = broker0.logManager().getLog(foo0, false).get();
+                log.renameDir(UnifiedLog.logFutureDirName(foo0), false);
+
+                // Start up broker0 and wait until the ISR of foo-0 is set to 
[0, 1, 2]
+                broker0.startup();
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                    var info = broker1.metadataCache().getLeaderAndIsr("foo", 
0);
+                    assertTrue(info.isPresent());
+                    assertEquals(Set.of(0, 1, 2), new 
HashSet<>(info.get().isr()));
+                    assertTrue(broker0.logManager().getLog(foo0, 
true).isEmpty());
+                });
+            }
+        }
+    }
+
+    @Test
+    public void 
testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir() throws 
Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2)
+                .setNumBrokerNodes(3)
+                .setNumDisksPerBroker(2)
+                .setNumControllerNodes(1)
+                .build())
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = cluster.admin()) {
+                var broker0 = cluster.brokers().get(0);
+                var broker1 = cluster.brokers().get(1);
+                var foo0 = new TopicPartition("foo", 0);
+
+                admin.createTopics(List.of(
+                    new NewTopic("foo", 3, (short) 3))).all().get();
+
+                // Wait until foo-0 is created on broker0.
+                TestUtils.retryOnExceptionWithTimeout(60000, () ->
+                    assertTrue(broker0.logManager().getLog(foo0, 
false).isDefined()));
+
+                // Shut down broker0 and wait until the ISR of foo-0 is set to 
[1, 2]
+                broker0.shutdown();
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                    var info = broker1.metadataCache().getLeaderAndIsr("foo", 
0);
+                    assertTrue(info.isPresent());
+                    assertEquals(Set.of(1, 2), new 
HashSet<>(info.get().isr()));
+                });
+
+                var log = broker0.logManager().getLog(foo0, false).get();
+
+                // Copy foo-0 to targetParentDir
+                // This is so that we can rename the main replica to a future 
down below
+                var parentDir = log.parentDir();
+                var targetParentDir = broker0.config().logDirs().stream()
+                    .filter(l -> !l.equals(parentDir))
+                    .findFirst()
+                    .orElseThrow();
+                var targetDirFile = new File(targetParentDir, 
log.dir().getName());
+                targetDirFile.mkdir();
+                try (Stream<Path> stream = 
Files.walk(Paths.get(log.dir().toString()))) {
+                    stream.forEach(p -> {
+                        var out = Paths.get(targetDirFile.toString(),
+                            
p.toString().substring(log.dir().toString().length()));
+                        if (!p.toString().equals(log.dir().toString())) {
+                            assertDoesNotThrow(() -> Files.copy(p, out));
+                        }
+                    });
+                }
+                assertTrue(targetDirFile.exists());
+
+                // Rename original log to a future
+                // This is equivalent to a failure during the promotion of the 
future replica and a restart with directory for
+                // the main replica being online
+                var originalLogFile = log.dir();
+                log.renameDir(UnifiedLog.logFutureDirName(foo0), false);
+                assertFalse(originalLogFile.exists());
+
+                // Start up broker0 and wait until the ISR of foo-0 is set to 
[0, 1, 2]
+                broker0.startup();
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+                    var info = broker1.metadataCache().getLeaderAndIsr("foo", 
0);
+                    assertTrue(info.isPresent());
+                    assertEquals(Set.of(0, 1, 2), new 
HashSet<>(info.get().isr()));
+                    assertTrue(broker0.logManager().getLog(foo0, 
true).isEmpty());
+                    assertFalse(targetDirFile.exists());
+                    assertTrue(originalLogFile.exists());
+                });
+            }
+        }
+    }
+
+    @Test
+    public void testControllerFailover() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(3).build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            try (Admin admin = cluster.admin()) {
+                // Create a test topic
+                admin.createTopics(List.of(
+                    new NewTopic("test-topic", 1, (short) 1))).all().get();
+                waitForTopicListing(admin, List.of("test-topic"), List.of());
+
+                // Shut down active controller
+                var active = cluster.waitForActiveController();
+                cluster.raftManagers().get(((QuorumController) 
active).nodeId()).shutdown();
+
+                // Create a test topic on the new active controller
+                admin.createTopics(List.of(
+                    new NewTopic("test-topic2", 1, (short) 1))).all().get();
+                waitForTopicListing(admin, List.of("test-topic2"), List.of());
+            }
+        }
+    }
+
+    /**
+     * Test that once a cluster is formatted, a bootstrap.metadata file that 
contains an unsupported
+     * MetadataVersion is not a problem. This is a regression test for 
KAFKA-19192.
+     */
+    @Test
+    public void testOldBootstrapMetadataFile() throws Exception {
+        var baseDirectory = TestUtils.tempDirectory().toPath();
+        try (var cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .setBaseDirectory(baseDirectory)
+                .build())
+            .setDeleteOnClose(false)
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+        }
+        var oldBootstrapMetadata = BootstrapMetadata.fromRecords(
+            List.of(
+                new ApiMessageAndVersion(
+                    new FeatureLevelRecord()
+                        .setName(MetadataVersion.FEATURE_NAME)
+                        .setFeatureLevel((short) 1),
+                    (short) 0)
+            ),
+            "oldBootstrapMetadata");
+        // Re-create the cluster using the same directory structure as above.
+        // Since we do not need to use the bootstrap metadata, the fact that
+        // it specifies an obsolete metadata.version should not be a problem.
+        try (var cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .setBaseDirectory(baseDirectory)
+                .setBootstrapMetadata(oldBootstrapMetadata)
+                .build()).build()) {
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+        }
+    }
+
+    @Test
+    public void testIncreaseNumIoThreads() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1).build())
+            .setConfigProp(ServerConfigs.NUM_IO_THREADS_CONFIG, "4")
+            .build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            try (Admin admin = cluster.admin()) {
+                admin.incrementalAlterConfigs(
+                    Map.of(new ConfigResource(Type.BROKER, ""),
+                        List.of(new AlterConfigOp(
+                            new 
ConfigEntry(ServerConfigs.NUM_IO_THREADS_CONFIG, "8"), 
OpType.SET)))).all().get();
+                var newTopic = List.of(new NewTopic("test-topic", 1, (short) 
1));
+                var createTopicResult = admin.createTopics(newTopic);
+                createTopicResult.all().get();
+                waitForTopicListing(admin, List.of("test-topic"), List.of());
+            }
+        }
+    }
+
     public static class BadAuthorizer implements Authorizer {
         // Default constructor needed for reflection object creation
         public BadAuthorizer() {

Reply via email to