This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 67d00e25e9 MINOR: Enable some AdminClient integration tests (#12110)
67d00e25e9 is described below

commit 67d00e25e941f73be8b959c6732ac4db1d1083bf
Author: dengziming <[email protected]>
AuthorDate: Thu May 19 00:39:26 2022 +0800

    MINOR: Enable some AdminClient integration tests (#12110)
    
    Enable KRaft in `AdminClientWithPoliciesIntegrationTes`t and 
`PlaintextAdminIntegrationTest`. There are some tests not enabled or not as 
expected yet:
    
    - testNullConfigs, see KAFKA-13863
    - testDescribeCluster and testMetadataRefresh, currently we don't get the 
real controller in KRaft mode so the test may not run as expected
    
    This patch also changes the exception type raised from invalid 
`IncrementalAlterConfig` requests with the `SUBTRACT` and `APPEND` operations. 
When the configuration value type is not a list, we now raise `INVALID_CONFIG` 
instead of `INVALID_REQUEST`.
    
    Reviewers: Luke Chen <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../scala/kafka/server/ConfigAdminManager.scala    |   4 +-
 .../AdminClientWithPoliciesIntegrationTest.scala   |  16 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 558 ++++++++++++---------
 3 files changed, 324 insertions(+), 254 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala 
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index e7d6c33ab2..cc7a98179d 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -494,7 +494,7 @@ object ConfigAdminManager {
         case OpType.DELETE => 
configProps.remove(alterConfigOp.configEntry.name)
         case OpType.APPEND => {
           if (!listType(alterConfigOp.configEntry.name, configKeys))
-            throw new InvalidRequestException(s"Config value append is not 
allowed for config key: ${alterConfigOp.configEntry.name}")
+            throw new InvalidConfigurationException(s"Config value append is 
not allowed for config key: ${alterConfigOp.configEntry.name}")
           val oldValueList = 
Option(configProps.getProperty(alterConfigOp.configEntry.name))
             
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
 ConfigDef.Type.LIST)))
             .getOrElse("")
@@ -505,7 +505,7 @@ object ConfigAdminManager {
         }
         case OpType.SUBTRACT => {
           if (!listType(alterConfigOp.configEntry.name, configKeys))
-            throw new InvalidRequestException(s"Config value subtract is not 
allowed for config key: ${alterConfigOp.configEntry.name}")
+            throw new InvalidConfigurationException(s"Config value subtract is 
not allowed for config key: ${alterConfigOp.configEntry.name}")
           val oldValueList = 
Option(configProps.getProperty(alterConfigOp.configEntry.name))
             
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
 ConfigDef.Type.LIST)))
             .getOrElse("")
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index fb1b0d248d..c9d40cadb0 100644
--- 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -15,18 +15,18 @@ package kafka.api
 
 import java.util
 import java.util.Properties
-import java.util.concurrent.ExecutionException
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig}
+import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
 import kafka.utils.{Logging, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigsOptions, Config, ConfigEntry}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.{InvalidConfigurationException, 
InvalidRequestException, PolicyViolationException}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.policy.AlterConfigPolicy
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, 
assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -143,10 +143,10 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     ).asJava)
 
     assertEquals(Set(topicResource1, topicResource2, topicResource3, 
brokerResource).asJava, alterResult.values.keySet)
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[PolicyViolationException])
     alterResult.values.get(topicResource2).get
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), 
classOf[InvalidConfigurationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
 
     // Verify that the second resource was updated and the others were not
     ensureConsistentKRaftMetadata()
@@ -172,10 +172,10 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2, topicResource3, 
brokerResource).asJava, alterResult.values.keySet)
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[PolicyViolationException])
     alterResult.values.get(topicResource2).get
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), 
classOf[InvalidConfigurationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
 
     // Verify that no resources are updated since validate_only = true
     ensureConsistentKRaftMetadata()
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 543b3b80cd..46cf3c9c4f 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.common.resource.{PatternType, 
ResourcePattern, ResourceT
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{ConsumerGroupState, ElectionType, 
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, 
Uuid}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 import org.slf4j.LoggerFactory
@@ -87,15 +87,17 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
-  @Test
-  def testClose(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testClose(quorum: String): Unit = {
     val client = Admin.create(createConfig)
     client.close()
     client.close() // double close has no effect
   }
 
-  @Test
-  def testListNodes(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListNodes(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val brokerStrs = bootstrapServers().split(",").toList.sorted
     var nodeStrs: List[String] = null
@@ -106,8 +108,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
   }
 
-  @Test
-  def testAdminClientHandlingBadIPWithoutTimeout(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAdminClientHandlingBadIPWithoutTimeout(quorum: String): Unit = {
     val config = createConfig
     config.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
"1000")
     val returnBadAddressFirst = new HostResolver {
@@ -120,8 +123,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     client.describeCluster().nodes().get()
   }
 
-  @Test
-  def testCreateExistingTopicsThrowTopicExistsException(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateExistingTopicsThrowTopicExistsException(quorum: String): Unit 
= {
     client = Admin.create(createConfig)
     val topic = "mytopic"
     val topics = Seq(topic)
@@ -130,14 +134,15 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     client.createTopics(newTopics.asJava).all.get()
     waitForTopics(client, topics, List())
 
-    val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (servers.size + 
1).toShort))
+    val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (brokers.size + 
1).toShort))
     val e = assertThrows(classOf[ExecutionException],
       () => client.createTopics(newTopicsWithInvalidRF.asJava, new 
CreateTopicsOptions().validateOnly(true)).all.get())
     assertTrue(e.getCause.isInstanceOf[TopicExistsException])
   }
 
-  @Test
-  def testDeleteTopicsWithIds(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicsWithIds(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topics = Seq("mytopic", "mytopic2", "mytopic3")
     val newTopics = Seq(
@@ -154,15 +159,16 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     waitForTopics(client, List(), topics)
   }
 
-  @Test
-  def testMetadataRefresh(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // KRaft mode will be supported in 
KAFKA-13910
+  def testMetadataRefresh(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topics = Seq("mytopic")
     val newTopics = Seq(new NewTopic("mytopic", 3, 3.toShort))
     client.createTopics(newTopics.asJava).all.get()
     waitForTopics(client, expectedPresent = topics, expectedMissing = List())
 
-    val controller = servers.find(_.config.brokerId == 
TestUtils.waitUntilControllerElected(zkClient)).get
+    val controller = brokers.find(_.config.brokerId == 
brokers.flatMap(_.metadataCache.getControllerId).head).get
     controller.shutdown()
     controller.awaitShutdown()
     val topicDesc = client.describeTopics(topics.asJava).allTopicNames.get()
@@ -172,8 +178,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   /**
     * describe should not auto create topics
     */
-  @Test
-  def testDescribeNonExistingTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeNonExistingTopic(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val existingTopic = "existing-topic"
@@ -183,18 +190,23 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val nonExistingTopic = "non-existing"
     val results = client.describeTopics(Seq(nonExistingTopic, 
existingTopic).asJava).topicNameValues()
     assertEquals(existingTopic, results.get(existingTopic).get.name)
-    assertThrows(classOf[ExecutionException], () => 
results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
-    assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
+    assertFutureExceptionTypeEquals(results.get(nonExistingTopic), 
classOf[UnknownTopicOrPartitionException])
+    if (!isKRaftTest()) {
+      assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
+    }
   }
 
-  @Test
-  def testDescribeTopicsWithIds(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTopicsWithIds(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val existingTopic = "existing-topic"
     client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 
1.toShort)).asJava).all.get()
     waitForTopics(client, Seq(existingTopic), List())
-    val existingTopicId = 
zkClient.getTopicIdsForTopics(Set(existingTopic)).values.head
+    ensureConsistentKRaftMetadata()
+
+    val existingTopicId = brokers.head.metadataCache.getTopicId(existingTopic)
 
     val nonExistingTopicId = Uuid.randomUuid()
 
@@ -203,37 +215,48 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertThrows(classOf[ExecutionException], () => 
results.get(nonExistingTopicId).get).getCause.isInstanceOf[UnknownTopicIdException]
   }
 
-  @Test
-  def testDescribeCluster(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeCluster(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val result = client.describeCluster
     val nodes = result.nodes.get()
     val clusterId = result.clusterId().get()
-    assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId)
+    assertEquals(brokers.head.dataPlaneRequestProcessor.clusterId, clusterId)
     val controller = result.controller().get()
-    
assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
-      getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
-    val brokers = bootstrapServers().split(",")
-    assertEquals(brokers.size, nodes.size)
+
+    if (isKRaftTest()) {
+      // In KRaft, we return a random brokerId as the current controller.
+      val brokerIds = brokers.map(_.config.brokerId).toSet
+      assertTrue(brokerIds.contains(controller.id))
+    } else {
+      
assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
+        getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id)
+    }
+
+    val brokerEndpoints = bootstrapServers().split(",")
+    assertEquals(brokerEndpoints.size, nodes.size)
     for (node <- nodes.asScala) {
       val hostStr = s"${node.host}:${node.port}"
-      assertTrue(brokers.contains(hostStr), s"Unknown host:port pair $hostStr 
in brokerVersionInfos")
+      assertTrue(brokerEndpoints.contains(hostStr), s"Unknown host:port pair 
$hostStr in brokerVersionInfos")
     }
   }
 
-  @Test
-  def testDescribeLogDirs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeLogDirs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topic = "topic"
     val leaderByPartition = createTopic(topic, numPartitions = 10)
     val partitionsByBroker = leaderByPartition.groupBy { case (_, leaderId) => 
leaderId }.map { case (k, v) =>
       k -> v.keys.toSeq
     }
-    val brokers = (0 until brokerCount).map(Integer.valueOf)
-    val logDirInfosByBroker = 
client.describeLogDirs(brokers.asJava).allDescriptions.get
+    ensureConsistentKRaftMetadata()
+    val brokerIds = (0 until brokerCount).map(Integer.valueOf)
+    val logDirInfosByBroker = 
client.describeLogDirs(brokerIds.asJava).allDescriptions.get
 
     (0 until brokerCount).foreach { brokerId =>
-      val server = servers.find(_.config.brokerId == brokerId).get
+      val server = brokers.find(_.config.brokerId == brokerId).get
       val expectedPartitions = partitionsByBroker(brokerId)
       val logDirInfos = logDirInfosByBroker.get(brokerId)
       val replicaInfos = logDirInfos.asScala.flatMap { case (_, logDirInfo) =>
@@ -249,36 +272,39 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
-  @Test
-  def testDescribeReplicaLogDirs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeReplicaLogDirs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topic = "topic"
     val leaderByPartition = createTopic(topic, numPartitions = 10)
     val replicas = leaderByPartition.map { case (partition, brokerId) =>
       new TopicPartitionReplica(topic, partition, brokerId)
     }.toSeq
+    ensureConsistentKRaftMetadata()
 
     val replicaDirInfos = 
client.describeReplicaLogDirs(replicas.asJavaCollection).all.get
     replicaDirInfos.forEach { (topicPartitionReplica, replicaDirInfo) =>
-      val server = servers.find(_.config.brokerId == 
topicPartitionReplica.brokerId()).get
+      val server = brokers.find(_.config.brokerId == 
topicPartitionReplica.brokerId()).get
       val tp = new TopicPartition(topicPartitionReplica.topic(), 
topicPartitionReplica.partition())
       assertEquals(server.logManager.getLog(tp).get.dir.getParent, 
replicaDirInfo.getCurrentReplicaLogDir)
     }
   }
 
-  @Test
-  def testAlterReplicaLogDirs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterReplicaLogDirs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topic = "topic"
     val tp = new TopicPartition(topic, 0)
-    val randomNums = servers.map(server => server -> Random.nextInt(2)).toMap
+    val randomNums = brokers.map(server => server -> Random.nextInt(2)).toMap
 
     // Generate two mutually exclusive replicaAssignment
-    val firstReplicaAssignment = servers.map { server =>
+    val firstReplicaAssignment = brokers.map { server =>
       val logDir = new 
File(server.config.logDirs(randomNums(server))).getAbsolutePath
       new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
     }.toMap
-    val secondReplicaAssignment = servers.map { server =>
+    val secondReplicaAssignment = brokers.map { server =>
       val logDir = new File(server.config.logDirs(1 - 
randomNums(server))).getAbsolutePath
       new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
     }.toMap
@@ -292,14 +318,15 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
 
     createTopic(topic, replicationFactor = brokerCount)
-    servers.foreach { server =>
+    ensureConsistentKRaftMetadata()
+    brokers.foreach { server =>
       val logDir = server.logManager.getLog(tp).get.dir.getParent
       assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, 
server.config.brokerId)), logDir)
     }
 
     // Verify that replica can be moved to the specified log directory after 
the topic has been created
     client.alterReplicaLogDirs(secondReplicaAssignment.asJava, new 
AlterReplicaLogDirsOptions).all.get
-    servers.foreach { server =>
+    brokers.foreach { server =>
       TestUtils.waitUntilTrue(() => {
         val logDir = server.logManager.getLog(tp).get.dir.getParent
         secondReplicaAssignment(new TopicPartitionReplica(topic, 0, 
server.config.brokerId)) == logDir
@@ -332,7 +359,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     try {
       TestUtils.waitUntilTrue(() => numMessages.get > 10, s"only $numMessages 
messages are produced before timeout. Producer future ${producerFuture.value}")
       client.alterReplicaLogDirs(firstReplicaAssignment.asJava, new 
AlterReplicaLogDirsOptions).all.get
-      servers.foreach { server =>
+      brokers.foreach { server =>
         TestUtils.waitUntilTrue(() => {
           val logDir = server.logManager.getLog(tp).get.dir.getParent
           firstReplicaAssignment(new TopicPartitionReplica(topic, 0, 
server.config.brokerId)) == logDir
@@ -347,7 +374,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val finalNumMessages = Await.result(producerFuture, Duration(20, 
TimeUnit.SECONDS))
 
     // Verify that all messages that are produced can be consumed
-    val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, 
finalNumMessages,
+    val consumerRecords = TestUtils.consumeTopicRecords(brokers, topic, 
finalNumMessages,
       securityProtocol = securityProtocol, trustStoreFile = trustStoreFile)
     consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
       assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new 
String(consumerRecord.value))
@@ -697,8 +724,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
-  @Test
-  def testSeekAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSeekAfterDeleteRecords(quorum: String): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = Admin.create(createConfig)
@@ -726,8 +754,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(10L, consumer.position(topicPartition))
   }
 
-  @Test
-  def testLogStartOffsetCheckpoint(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLogStartOffsetCheckpoint(quorum: String): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = Admin.create(createConfig)
@@ -765,8 +794,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }, s"Expected low watermark of the partition to be 5 but got 
${lowWatermark.getOrElse("no response within the timeout")}")
   }
 
-  @Test
-  def testLogStartOffsetAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLogStartOffsetAfterDeleteRecords(quorum: String): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = Admin.create(createConfig)
@@ -782,25 +812,26 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(3L, lowWatermark)
 
     for (i <- 0 until brokerCount)
-      assertEquals(3, 
servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
+      assertEquals(3, 
brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
   }
 
-  @Test
-  def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(quorum: String): 
Unit = {
     val leaders = createTopic(topic, replicationFactor = brokerCount)
-    val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
+    val followerIndex = if (leaders(0) != brokers(0).config.brokerId) 0 else 1
 
     def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: 
Long): Unit = {
-      TestUtils.waitUntilTrue(() => 
servers(followerIndex).replicaManager.localLog(topicPartition) != None,
+      TestUtils.waitUntilTrue(() => 
brokers(followerIndex).replicaManager.localLog(topicPartition) != None,
                               "Expected follower to create replica for 
partition")
 
       // wait until the follower discovers that log start offset moved beyond 
its HW
       TestUtils.waitUntilTrue(() => {
-        
servers(followerIndex).replicaManager.localLog(topicPartition).get.logStartOffset
 == expectedStartOffset
+        
brokers(followerIndex).replicaManager.localLog(topicPartition).get.logStartOffset
 == expectedStartOffset
       }, s"Expected follower to discover new log start offset 
$expectedStartOffset")
 
       TestUtils.waitUntilTrue(() => {
-        
servers(followerIndex).replicaManager.localLog(topicPartition).get.logEndOffset 
== expectedEndOffset
+        
brokers(followerIndex).replicaManager.localLog(topicPartition).get.logEndOffset 
== expectedEndOffset
       }, s"Expected follower to catch up to log end offset $expectedEndOffset")
     }
 
@@ -821,7 +852,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     // after the new replica caught up, all replicas should have same log 
start offset
     for (i <- 0 until brokerCount)
-      assertEquals(3, 
servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
+      assertEquals(3, 
brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
 
     // kill the same follower again, produce more records, and delete records 
beyond follower's LOE
     killBroker(followerIndex)
@@ -832,8 +863,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
   }
 
-  @Test
-  def testAlterLogDirsAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterLogDirsAfterDeleteRecords(quorum: String): Unit = {
     client = Admin.create(createConfig)
     createTopic(topic, replicationFactor = brokerCount)
     val expectedLEO = 100
@@ -845,27 +877,28 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     result.all().get()
     // make sure we are in the expected state after delete records
     for (i <- 0 until brokerCount) {
-      assertEquals(3, 
servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
-      assertEquals(expectedLEO, 
servers(i).replicaManager.localLog(topicPartition).get.logEndOffset)
+      assertEquals(3, 
brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
+      assertEquals(expectedLEO, 
brokers(i).replicaManager.localLog(topicPartition).get.logEndOffset)
     }
 
     // we will create another dir just for one server
-    val futureLogDir = servers(0).config.logDirs(1)
-    val futureReplica = new TopicPartitionReplica(topic, 0, 
servers(0).config.brokerId)
+    val futureLogDir = brokers(0).config.logDirs(1)
+    val futureReplica = new TopicPartitionReplica(topic, 0, 
brokers(0).config.brokerId)
 
     // Verify that replica can be moved to the specified log directory
     client.alterReplicaLogDirs(Map(futureReplica -> 
futureLogDir).asJava).all.get
     TestUtils.waitUntilTrue(() => {
-      futureLogDir == 
servers(0).logManager.getLog(topicPartition).get.dir.getParent
+      futureLogDir == 
brokers(0).logManager.getLog(topicPartition).get.dir.getParent
     }, "timed out waiting for replica movement")
 
     // once replica moved, its LSO and LEO should match other replicas
-    assertEquals(3, 
servers.head.replicaManager.localLog(topicPartition).get.logStartOffset)
-    assertEquals(expectedLEO, 
servers.head.replicaManager.localLog(topicPartition).get.logEndOffset)
+    assertEquals(3, 
brokers.head.replicaManager.localLog(topicPartition).get.logStartOffset)
+    assertEquals(expectedLEO, 
brokers.head.replicaManager.localLog(topicPartition).get.logEndOffset)
   }
 
-  @Test
-  def testOffsetsForTimesAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testOffsetsForTimesAfterDeleteRecords(quorum: String): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = Admin.create(createConfig)
@@ -886,8 +919,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertNull(consumer.offsetsForTimes(Map(topicPartition -> 
JLong.valueOf(0L)).asJava).get(topicPartition))
   }
 
-  @Test
-  def testConsumeAfterDeleteRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConsumeAfterDeleteRecords(quorum: String): Unit = {
     val consumer = createConsumer()
     subscribeAndWaitForAssignment(topic, consumer)
 
@@ -909,8 +943,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     TestUtils.consumeRecords(consumer, 2)
   }
 
-  @Test
-  def testDeleteRecordsWithException(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteRecordsWithException(quorum: String): Unit = {
     val consumer = createConsumer()
     subscribeAndWaitForAssignment(topic, consumer)
 
@@ -934,8 +969,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(classOf[LeaderNotAvailableException], cause.getClass)
   }
 
-  @Test
-  def testDescribeConfigsForTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeConfigsForTopic(quorum: String): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
     client = Admin.create(createConfig)
 
@@ -982,8 +1018,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
    * Also see [[kafka.api.SaslSslAdminIntegrationTest.testAclOperations()]] 
for tests of ACL operations
    * when the authorizer is enabled.
    */
-  @Test
-  def testAclOperations(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAclOperations(quorum: String): Unit = {
     val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
     client = Admin.create(createConfig)
@@ -998,8 +1035,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     * Test closing the AdminClient with a generous timeout.  Calls in progress 
should be completed,
     * since they can be done within the timeout.  New calls should receive 
timeouts.
     */
-  @Test
-  def testDelayedClose(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDelayedClose(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topics = Seq("mytopic", "mytopic2")
     val newTopics = topics.map(new NewTopic(_, 1, 1.toShort))
@@ -1015,8 +1053,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     * Test closing the AdminClient with a timeout of 0, when there are calls 
with extremely long
     * timeouts in progress.  The calls should be aborted after the hard 
shutdown timeout elapses.
     */
-  @Test
-  def testForceClose(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testForceClose(quorum: String): Unit = {
     val config = createConfig
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
     client = Admin.create(config)
@@ -1032,8 +1071,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     * Check that a call with a timeout does not complete before the minimum 
timeout has elapsed,
     * even when the default request timeout is shorter.
     */
-  @Test
-  def testMinimumRequestTimeouts(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMinimumRequestTimeouts(quorum: String): Unit = {
     val config = createConfig
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
     config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
@@ -1049,8 +1089,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   /**
     * Test injecting timeouts for calls that are in flight.
     */
-  @Test
-  def testCallInFlightTimeouts(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCallInFlightTimeouts(quorum: String): Unit = {
     val config = createConfig
     config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
     config.put(AdminClientConfig.RETRIES_CONFIG, "0")
@@ -1068,8 +1109,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   /**
    * Test the consumer group APIs.
    */
-  @Test
-  def testConsumerGroups(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConsumerGroups(quorum: String): Unit = {
     val config = createConfig
     client = Admin.create(config)
     try {
@@ -1287,8 +1329,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
-  @Test
-  def testDeleteConsumerGroupOffsets(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteConsumerGroupOffsets(quorum: String): Unit = {
     val config = createConfig
     client = Admin.create(config)
     try {
@@ -1359,8 +1402,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
-  @Test
-  def testElectPreferredLeaders(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectPreferredLeaders(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val prefer0 = Seq(0, 1, 2)
@@ -1368,10 +1412,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val prefer2 = Seq(2, 0, 1)
 
     val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0)
-    TestUtils.createTopic(zkClient, partition1.topic, Map[Int, 
Seq[Int]](partition1.partition -> prefer0), servers)
+    createTopicWithAssignment(partition1.topic, Map[Int, 
Seq[Int]](partition1.partition -> prefer0))
 
     val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0)
-    TestUtils.createTopic(zkClient, partition2.topic, Map[Int, 
Seq[Int]](partition2.partition -> prefer0), servers)
+    createTopicWithAssignment(partition2.topic, Map[Int, 
Seq[Int]](partition2.partition -> prefer0))
 
     def preferredLeader(topicPartition: TopicPartition): Int = {
       val partitionMetadata = getTopicMetadata(client, 
topicPartition.topic).partitions.get(topicPartition.partition)
@@ -1380,19 +1424,18 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
 
     /** Changes the <i>preferred</i> leader without changing the 
<i>current</i> leader. */
-    def changePreferredLeader(newAssignment: Seq[Int]) = {
+    def changePreferredLeader(newAssignment: Seq[Int]): Unit = {
       val preferred = newAssignment.head
-      val prior1 = zkClient.getLeaderForPartition(partition1).get
-      val prior2 = zkClient.getLeaderForPartition(partition2).get
-
-      var m = Map.empty[TopicPartition, Seq[Int]]
+      val prior1 = 
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, 
partition1.partition(), listenerName).get.id()
+      val prior2 = 
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, 
partition2.partition(), listenerName).get.id()
 
+      var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
       if (prior1 != preferred)
-        m += partition1 -> newAssignment
+        m += partition1 -> Optional.of(new 
NewPartitionReassignment(newAssignment.map(Int.box).asJava))
       if (prior2 != preferred)
-        m += partition2 -> newAssignment
+        m += partition2 -> Optional.of(new 
NewPartitionReassignment(newAssignment.map(Int.box).asJava))
+      client.alterPartitionReassignments(m.asJava).all().get()
 
-      zkClient.createPartitionReassignment(m)
       TestUtils.waitUntilTrue(
         () => preferredLeader(partition1) == preferred && 
preferredLeader(partition2) == preferred,
         s"Expected preferred leader to become $preferred, but is 
${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
@@ -1408,7 +1451,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     // Noop election
     var electResult = client.electLeaders(ElectionType.PREFERRED, 
Set(partition1).asJava)
-    var exception = electResult.partitions.get.get(partition1).get
+    val exception = electResult.partitions.get.get(partition1).get
     assertEquals(classOf[ElectionNotNeededException], exception.getClass)
     TestUtils.assertLeader(client, partition1, 0)
 
@@ -1437,13 +1480,24 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
     TestUtils.assertLeader(client, partition2, 1)
 
+    def assertUnknownTopicOrPartition(
+      topicPartition: TopicPartition,
+      result: ElectLeadersResult
+    ): Unit = {
+      val exception = result.partitions.get.get(topicPartition).get
+      assertEquals(classOf[UnknownTopicOrPartitionException], 
exception.getClass)
+      if (isKRaftTest()) {
+        assertEquals(s"No such topic as ${topicPartition.topic()}", 
exception.getMessage)
+      } else {
+        assertEquals("The partition does not exist.", exception.getMessage)
+      }
+    }
+
     // unknown topic
     val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
     electResult = client.electLeaders(ElectionType.PREFERRED, 
Set(unknownPartition).asJava)
     assertEquals(Set(unknownPartition).asJava, 
electResult.partitions.get.keySet)
-    exception = electResult.partitions.get.get(unknownPartition).get
-    assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
-    assertEquals("The partition does not exist.", exception.getMessage)
+    assertUnknownTopicOrPartition(unknownPartition, electResult)
     TestUtils.assertLeader(client, partition1, 1)
     TestUtils.assertLeader(client, partition2, 1)
 
@@ -1455,9 +1509,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(Set(unknownPartition, partition1).asJava, 
electResult.partitions.get.keySet)
     TestUtils.assertLeader(client, partition1, 2)
     TestUtils.assertLeader(client, partition2, 1)
-    exception = electResult.partitions.get.get(unknownPartition).get
-    assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
-    assertEquals("The partition does not exist.", exception.getMessage)
+    assertUnknownTopicOrPartition(unknownPartition, electResult)
 
     // elect preferred leader for partition 2
     electResult = client.electLeaders(ElectionType.PREFERRED, 
Set(partition2).asJava)
@@ -1468,41 +1520,48 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
     // but shut it down...
-    servers(1).shutdown()
+    brokers(1).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), 
Set(1))
 
+    def assertPreferredLeaderNotAvailable(
+      topicPartition: TopicPartition,
+      result: ElectLeadersResult
+    ): Unit = {
+      val exception = result.partitions.get.get(topicPartition).get
+      assertEquals(classOf[PreferredLeaderNotAvailableException], 
exception.getClass)
+      if (isKRaftTest()) {
+        assertTrue(exception.getMessage.contains(
+          "The preferred leader was not available."),
+          s"Unexpected message: ${exception.getMessage}")
+      } else {
+        assertTrue(exception.getMessage.contains(
+          s"Failed to elect leader for partition $topicPartition under 
strategy PreferredReplicaPartitionLeaderElectionStrategy"),
+          s"Unexpected message: ${exception.getMessage}")
+      }
+    }
+
     // ... now what happens if we try to elect the preferred leader and it's 
down?
     val shortTimeout = new ElectLeadersOptions().timeoutMs(10000)
     electResult = client.electLeaders(ElectionType.PREFERRED, 
Set(partition1).asJava, shortTimeout)
     assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
-    exception = electResult.partitions.get.get(partition1).get
-    assertEquals(classOf[PreferredLeaderNotAvailableException], 
exception.getClass)
-    assertTrue(exception.getMessage.contains(
-      "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 
under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
-      s"Wrong message ${exception.getMessage}")
+
+    assertPreferredLeaderNotAvailable(partition1, electResult)
     TestUtils.assertLeader(client, partition1, 2)
 
     // preferred leader unavailable with null argument
     electResult = client.electLeaders(ElectionType.PREFERRED, null, 
shortTimeout)
+    assertTrue(Set(partition1, 
partition2).subsetOf(electResult.partitions.get.keySet.asScala))
 
-    exception = electResult.partitions.get.get(partition1).get
-    assertEquals(classOf[PreferredLeaderNotAvailableException], 
exception.getClass)
-    assertTrue(exception.getMessage.contains(
-      "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 
under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
-      s"Wrong message ${exception.getMessage}")
-
-    exception = electResult.partitions.get.get(partition2).get
-    assertEquals(classOf[PreferredLeaderNotAvailableException], 
exception.getClass)
-    assertTrue(exception.getMessage.contains(
-      "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 
under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
-      s"Wrong message ${exception.getMessage}")
-
+    assertPreferredLeaderNotAvailable(partition1, electResult)
     TestUtils.assertLeader(client, partition1, 2)
+
+    assertPreferredLeaderNotAvailable(partition2, electResult)
     TestUtils.assertLeader(client, partition2, 2)
   }
 
-  @Test
-  def testElectUncleanLeadersForOnePartition(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersForOnePartition(quorum: String): Unit = {
     // Case: unclean leader election with one topic partition
     client = Admin.create(createConfig)
 
@@ -1511,23 +1570,24 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val assignment1 = Seq(broker1, broker2)
 
     val partition1 = new TopicPartition("unclean-test-topic-1", 0)
-    TestUtils.createTopic(zkClient, partition1.topic, Map[Int, 
Seq[Int]](partition1.partition -> assignment1), servers)
+    createTopicWithAssignment(partition1.topic, Map[Int, 
Seq[Int]](partition1.partition -> assignment1))
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    servers(broker2).shutdown()
+    brokers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertNoLeader(client, partition1)
-    servers(broker2).startup()
+    brokers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
Set(partition1).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
     TestUtils.assertLeader(client, partition1, broker2)
   }
 
-  @Test
-  def testElectUncleanLeadersForManyPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersForManyPartitions(quorum: String): Unit = {
     // Case: unclean leader election with many topic partitions
     client = Admin.create(createConfig)
 
@@ -1540,22 +1600,20 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val partition1 = new TopicPartition(topic, 0)
     val partition2 = new TopicPartition(topic, 1)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(partition1.partition -> assignment1, partition2.partition -> 
assignment2),
-      servers
+      Map(partition1.partition -> assignment1, partition2.partition -> 
assignment2)
     )
 
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    servers(broker2).shutdown()
+    brokers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), 
Set(broker2))
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertNoLeader(client, partition2)
-    servers(broker2).startup()
+    brokers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
Set(partition1, partition2).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
@@ -1564,8 +1622,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     TestUtils.assertLeader(client, partition2, broker2)
   }
 
-  @Test
-  def testElectUncleanLeadersForAllPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersForAllPartitions(quorum: String): Unit = {
     // Case: noop unclean leader election and valid unclean leader election 
for all partitions
     client = Admin.create(createConfig)
 
@@ -1579,22 +1638,20 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val partition1 = new TopicPartition(topic, 0)
     val partition2 = new TopicPartition(topic, 1)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(partition1.partition -> assignment1, partition2.partition -> 
assignment2),
-      servers
+      Map(partition1.partition -> assignment1, partition2.partition -> 
assignment2)
     )
 
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    servers(broker2).shutdown()
+    brokers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertLeader(client, partition2, broker3)
-    servers(broker2).startup()
+    brokers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
@@ -1603,8 +1660,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     TestUtils.assertLeader(client, partition2, broker3)
   }
 
-  @Test
-  def testElectUncleanLeadersForUnknownPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersForUnknownPartitions(quorum: String): Unit = {
     // Case: unclean leader election for unknown topic
     client = Admin.create(createConfig)
 
@@ -1616,11 +1674,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val unknownPartition = new TopicPartition(topic, 1)
     val unknownTopic = new TopicPartition("unknown-topic", 0)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(0 -> assignment1),
-      servers
+      Map(0 -> assignment1)
     )
 
     TestUtils.assertLeader(client, new TopicPartition(topic, 0), broker1)
@@ -1630,8 +1686,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     
assertTrue(electResult.partitions.get.get(unknownTopic).get.isInstanceOf[UnknownTopicOrPartitionException])
   }
 
-  @Test
-  def testElectUncleanLeadersWhenNoLiveBrokers(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersWhenNoLiveBrokers(quorum: String): Unit = {
     // Case: unclean leader election with no live brokers
     client = Admin.create(createConfig)
 
@@ -1642,26 +1699,25 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val topic = "unclean-test-topic-1"
     val partition1 = new TopicPartition(topic, 0)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(partition1.partition -> assignment1),
-      servers
+      Map(partition1.partition -> assignment1)
     )
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    servers(broker2).shutdown()
+    brokers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertNoLeader(client, partition1)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
Set(partition1).asJava)
     
assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
   }
 
-  @Test
-  def testElectUncleanLeadersNoop(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersNoop(quorum: String): Unit = {
     // Case: noop unclean leader election with explicit topic partitions
     client = Admin.create(createConfig)
 
@@ -1672,25 +1728,24 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val topic = "unclean-test-topic-1"
     val partition1 = new TopicPartition(topic, 0)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(partition1.partition -> assignment1),
-      servers
+      Map(partition1.partition -> assignment1)
     )
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertLeader(client, partition1, broker2)
-    servers(broker1).startup()
+    brokers(broker1).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
Set(partition1).asJava)
     
assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[ElectionNotNeededException])
   }
 
-  @Test
-  def testElectUncleanLeadersAndNoop(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectUncleanLeadersAndNoop(quorum: String): Unit = {
     // Case: one noop unclean leader election and one valid unclean leader 
election
     client = Admin.create(createConfig)
 
@@ -1704,22 +1759,20 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val partition1 = new TopicPartition(topic, 0)
     val partition2 = new TopicPartition(topic, 1)
 
-    TestUtils.createTopic(
-      zkClient,
+    createTopicWithAssignment(
       topic,
-      Map(partition1.partition -> assignment1, partition2.partition -> 
assignment2),
-      servers
+      Map(partition1.partition -> assignment1, partition2.partition -> 
assignment2)
     )
 
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    servers(broker2).shutdown()
+    brokers(broker2).shutdown()
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    servers(broker1).shutdown()
+    brokers(broker1).shutdown()
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertLeader(client, partition2, broker3)
-    servers(broker2).startup()
+    brokers(broker2).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
Set(partition1, partition2).asJava)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
@@ -1728,8 +1781,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     TestUtils.assertLeader(client, partition2, broker3)
   }
 
-  @Test
-  def testListReassignmentsDoesNotShowNonReassigningPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListReassignmentsDoesNotShowNonReassigningPartitions(quorum: 
String): Unit = {
     client = Admin.create(createConfig)
 
     // Create topics
@@ -1744,8 +1798,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(0, allReassignmentsMap.size())
   }
 
-  @Test
-  def testListReassignmentsDoesNotShowDeletedPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListReassignmentsDoesNotShowDeletedPartitions(quorum: String): Unit 
= {
     client = Admin.create(createConfig)
 
     val topic = "list-reassignments-no-reassignments"
@@ -1925,8 +1980,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(appendValues, 
configs.get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp).value)
   }
 
-  @Test
-  def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(quorum: String): 
Unit = {
     client = Admin.create(createConfig)
     val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
     client.incrementalAlterConfigs(Map(broker0Resource ->
@@ -1937,9 +1993,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ).asJavaCollection).asJava).all().get()
     TestUtils.waitUntilTrue(() => {
       val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
-        all().get().get(broker0Resource).entries().asScala.map {
-        case entry => (entry.name, entry.value)
-      }.toMap
+        all().get().get(broker0Resource).entries().asScala.map(entry => 
(entry.name, entry.value)).toMap
       
("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
 "")) &&
         
"456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
 "")))
     }, "Expected to see the broker properties we just set", pause=25)
@@ -1953,17 +2007,16 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ).asJavaCollection).asJava).all().get()
     TestUtils.waitUntilTrue(() => {
       val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
-        all().get().get(broker0Resource).entries().asScala.map {
-        case entry => (entry.name, entry.value)
-      }.toMap
+        all().get().get(broker0Resource).entries().asScala.map(entry => 
(entry.name, entry.value)).toMap
       
("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
 "")) &&
         
"654".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
 "")) &&
         
"987".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
 "")))
     }, "Expected to see the broker properties we just modified", pause=25)
   }
 
-  @Test
-  def testIncrementalAlterConfigsDeleteBrokerConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncrementalAlterConfigsDeleteBrokerConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
     client.incrementalAlterConfigs(Map(broker0Resource ->
@@ -1976,9 +2029,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ).asJavaCollection).asJava).all().get()
     TestUtils.waitUntilTrue(() => {
       val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
-        all().get().get(broker0Resource).entries().asScala.map {
-        case entry => (entry.name, entry.value)
-      }.toMap
+        all().get().get(broker0Resource).entries().asScala.map(entry => 
(entry.name, entry.value)).toMap
       
("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
 "")) &&
         
"456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
 "")) &&
         
"789".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
 "")))
@@ -1993,17 +2044,16 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ).asJavaCollection).asJava).all().get()
     TestUtils.waitUntilTrue(() => {
       val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
-        all().get().get(broker0Resource).entries().asScala.map {
-        case entry => (entry.name, entry.value)
-      }.toMap
+        all().get().get(broker0Resource).entries().asScala.map(entry => 
(entry.name, entry.value)).toMap
       
("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
 "")) &&
         
"".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
 "")) &&
         
"".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
 "")))
     }, "Expected to see the broker properties we just removed to be deleted", 
pause=25)
   }
 
-  @Test
-  def testInvalidIncrementalAlterConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidIncrementalAlterConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     // Create topics
@@ -2015,14 +2065,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
     createTopic(topic2)
 
-    //Add duplicate Keys for topic1
+    // Add duplicate Keys for topic1
     var topic1AlterConfigs = Seq(
       new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, 
"0.75"), AlterConfigOp.OpType.SET),
       new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, 
"0.65"), AlterConfigOp.OpType.SET),
       new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, 
"gzip"), AlterConfigOp.OpType.SET) // valid entry
     ).asJavaCollection
 
-    //Add valid config for topic2
+    // Add valid config for topic2
     var topic2AlterConfigs = Seq(
       new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, 
"0.9"), AlterConfigOp.OpType.SET)
     ).asJavaCollection
@@ -2033,12 +2083,13 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     ).asJava)
     assertEquals(Set(topic1Resource, topic2Resource).asJava, 
alterResult.values.keySet)
 
-    //InvalidRequestException error for topic1
+    // InvalidRequestException error for topic1
     assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidRequestException],
       Some("Error due to duplicate config keys"))
 
-    //operation should succeed for topic2
+    // Operation should succeed for topic2
     alterResult.values().get(topic2Resource).get()
+    ensureConsistentKRaftMetadata()
 
     // Verify that topic1 is not config not updated, and topic2 config is 
updated
     val describeResult = client.describeConfigs(Seq(topic1Resource, 
topic2Resource).asJava)
@@ -2046,10 +2097,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(2, configs.size)
 
     assertEquals(Defaults.LogCleanerMinCleanRatio.toString, 
configs.get(topic1Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
-    assertEquals(Defaults.CompressionType.toString, 
configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value)
+    assertEquals(Defaults.CompressionType, 
configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value)
     assertEquals("0.9", 
configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
 
-    //check invalid use of append/subtract operation types
+    // Check invalid use of append/subtract operation types
     topic1AlterConfigs = Seq(
       new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, 
"gzip"), AlterConfigOp.OpType.APPEND)
     ).asJavaCollection
@@ -2064,14 +2115,21 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     ).asJava)
     assertEquals(Set(topic1Resource, topic2Resource).asJava, 
alterResult.values.keySet)
 
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidRequestException],
-      Some("Config value append is not allowed for config"))
-
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), 
classOf[InvalidRequestException],
-      Some("Config value subtract is not allowed for config"))
+    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
+      if (isKRaftTest()) {
+        Some("Can't APPEND to key compression.type because its type is not 
LIST.")
+      } else {
+        Some("Config value append is not allowed for config")
+      })
 
+    assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), 
classOf[InvalidConfigurationException],
+      if (isKRaftTest()) {
+        Some("Can't SUBTRACT to key compression.type because its type is not 
LIST.")
+      } else {
+        Some("Config value subtract is not allowed for config")
+      })
 
-    //try to add invalid config
+    // Try to add invalid config
     topic1AlterConfigs = Seq(
       new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, 
"1.1"), AlterConfigOp.OpType.SET)
     ).asJavaCollection
@@ -2082,11 +2140,16 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
 
     assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
-      Some("Invalid config value for resource"))
+      if (isKRaftTest()) {
+        Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio: 
Value must be no more than 1")
+      } else {
+        Some("Invalid config value for resource")
+      })
   }
 
-  @Test
-  def testInvalidAlterPartitionReassignments(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidAlterPartitionReassignments(quorum: String): Unit = {
     client = Admin.create(createConfig)
     val topic = "alter-reassignments-topic-1"
     val tp1 = new TopicPartition(topic, 0)
@@ -2124,8 +2187,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), 
classOf[InvalidReplicaAssignmentException])
   }
 
-  @Test
-  def testLongTopicNames(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLongTopicNames(quorum: String): Unit = {
     val client = Admin.create(createConfig)
     val longTopicName = String.join("", Collections.nCopies(249, "x"));
     val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
@@ -2137,14 +2201,15 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertTrue(results.containsKey(invalidTopicName))
     assertFutureExceptionTypeEquals(results.get(invalidTopicName), 
classOf[InvalidTopicException])
     assertFutureExceptionTypeEquals(client.alterReplicaLogDirs(
-      Map(new TopicPartitionReplica(longTopicName, 0, 0) -> 
servers(0).config.logDirs(0)).asJava).all(),
+      Map(new TopicPartitionReplica(longTopicName, 0, 0) -> 
brokers(0).config.logDirs(0)).asJava).all(),
       classOf[InvalidTopicException])
     client.close()
   }
 
   // Verify that createTopics and alterConfigs fail with null values
-  @Test
-  def testNullConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testNullConfigs(quorum: String): Unit = {
 
     def validateLogConfig(compressionType: String): Unit = {
       val logConfig = zkClient.getLogConfigs(Set(topic), 
Collections.emptyMap[String, AnyRef])._1(topic)
@@ -2182,8 +2247,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     validateLogConfig(compressionType = "producer")
   }
 
-  @Test
-  def testDescribeConfigsForLog4jLogLevels(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeConfigsForLog4jLogLevels(quorum: String): Unit = {
     client = Admin.create(createConfig)
     LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create 
the logger")
     val loggerConfig = describeBrokerLoggers()
@@ -2198,9 +2264,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty)
   }
 
-  @Test
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
   @Disabled // To be re-enabled once KAFKA-8779 is resolved
-  def testIncrementalAlterConfigsForLog4jLogLevels(): Unit = {
+  def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val initialLoggerConfig = describeBrokerLoggers()
@@ -2262,9 +2329,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     * 4. Change ROOT logger to ERROR
     * 5. Ensure the kafka.controller.KafkaController logger's level is ERROR 
(the curent root logger level)
     */
-  @Test
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
   @Disabled // To be re-enabled once KAFKA-8779 is resolved
-  def 
testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit 
= {
+  def 
testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(quorum: 
String): Unit = {
     client = Admin.create(createConfig)
     // step 1 - configure root logger
     val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL
@@ -2304,9 +2372,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(newRootLogLevel, 
newRootLoggerConfig.get("kafka.controller.KafkaController").value())
   }
 
-  @Test
-  @Disabled // To be re-enabled once KAFKA-8779 is resolved
-  def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(): 
Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  @Disabled // Zk to be re-enabled once KAFKA-8779 is resolved
+  def 
testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum: 
String): Unit = {
     client = Admin.create(createConfig)
     val deleteRootLoggerEntry = Seq(
       new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, ""), 
AlterConfigOp.OpType.DELETE)
@@ -2315,9 +2384,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertTrue(assertThrows(classOf[ExecutionException], () => 
alterBrokerLoggers(deleteRootLoggerEntry)).getCause.isInstanceOf[InvalidRequestException])
   }
 
-  @Test
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
   @Disabled // To be re-enabled once KAFKA-8779 is resolved
-  def 
testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(): 
Unit = {
+  def 
testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum:
 String): Unit = {
     client = Admin.create(createConfig)
     val validLoggerName = "kafka.server.KafkaRequestHandler"
     val expectedValidLoggerLogLevel = 
describeBrokerLoggers().get(validLoggerName)
@@ -2359,9 +2429,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     * The AlterConfigs API is deprecated and should not support altering log 
levels
     */
   @nowarn("cat=deprecation")
-  @Test
-  @Disabled // To be re-enabled once KAFKA-8779 is resolved
-  def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft")) // Zk to be re-enabled once 
KAFKA-8779 is resolved
+  def testAlterConfigsForLog4jLogLevelsDoesNotWork(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val alterLogLevelsEntries = Seq(
@@ -2591,9 +2661,9 @@ object PlaintextAdminIntegrationTest {
     ).asJava)
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, 
alterResult.values.keySet)
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[InvalidConfigurationException])
     alterResult.values.get(topicResource2).get
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
 
     // Verify that first and third resources were not updated and second was 
updated
     test.ensureConsistentKRaftMetadata()
@@ -2620,9 +2690,9 @@ object PlaintextAdminIntegrationTest {
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, 
alterResult.values.keySet)
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[InvalidConfigurationException])
     alterResult.values.get(topicResource2).get
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
 
     // Verify that no resources are updated since validate_only = true
     test.ensureConsistentKRaftMetadata()

Reply via email to