This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 67d00e25e9 MINOR: Enable some AdminClient integration tests (#12110)
67d00e25e9 is described below
commit 67d00e25e941f73be8b959c6732ac4db1d1083bf
Author: dengziming <[email protected]>
AuthorDate: Thu May 19 00:39:26 2022 +0800
MINOR: Enable some AdminClient integration tests (#12110)
Enable KRaft in `AdminClientWithPoliciesIntegrationTes`t and
`PlaintextAdminIntegrationTest`. There are some tests not enabled or not as
expected yet:
- testNullConfigs, see KAFKA-13863
- testDescribeCluster and testMetadataRefresh, currently we don't get the
real controller in KRaft mode so the test may not run as expected
This patch also changes the exception type raised from invalid
`IncrementalAlterConfig` requests with the `SUBTRACT` and `APPEND` operations.
When the configuration value type is not a list, we now raise `INVALID_CONFIG`
instead of `INVALID_REQUEST`.
Reviewers: Luke Chen <[email protected]>, Jason Gustafson
<[email protected]>
---
.../scala/kafka/server/ConfigAdminManager.scala | 4 +-
.../AdminClientWithPoliciesIntegrationTest.scala | 16 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 558 ++++++++++++---------
3 files changed, 324 insertions(+), 254 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index e7d6c33ab2..cc7a98179d 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -494,7 +494,7 @@ object ConfigAdminManager {
case OpType.DELETE =>
configProps.remove(alterConfigOp.configEntry.name)
case OpType.APPEND => {
if (!listType(alterConfigOp.configEntry.name, configKeys))
- throw new InvalidRequestException(s"Config value append is not
allowed for config key: ${alterConfigOp.configEntry.name}")
+ throw new InvalidConfigurationException(s"Config value append is
not allowed for config key: ${alterConfigOp.configEntry.name}")
val oldValueList =
Option(configProps.getProperty(alterConfigOp.configEntry.name))
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
ConfigDef.Type.LIST)))
.getOrElse("")
@@ -505,7 +505,7 @@ object ConfigAdminManager {
}
case OpType.SUBTRACT => {
if (!listType(alterConfigOp.configEntry.name, configKeys))
- throw new InvalidRequestException(s"Config value subtract is not
allowed for config key: ${alterConfigOp.configEntry.name}")
+ throw new InvalidConfigurationException(s"Config value subtract is
not allowed for config key: ${alterConfigOp.configEntry.name}")
val oldValueList =
Option(configProps.getProperty(alterConfigOp.configEntry.name))
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
ConfigDef.Type.LIST)))
.getOrElse("")
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index fb1b0d248d..c9d40cadb0 100644
---
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -15,18 +15,18 @@ package kafka.api
import java.util
import java.util.Properties
-import java.util.concurrent.ExecutionException
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
+import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigsOptions, Config, ConfigEntry}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException, PolicyViolationException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.policy.AlterConfigPolicy
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNull,
assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -143,10 +143,10 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
).asJava)
assertEquals(Set(topicResource1, topicResource2, topicResource3,
brokerResource).asJava, alterResult.values.keySet)
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+ assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[PolicyViolationException])
alterResult.values.get(topicResource2).get
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+ assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3),
classOf[InvalidConfigurationException])
+ assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
// Verify that the second resource was updated and the others were not
ensureConsistentKRaftMetadata()
@@ -172,10 +172,10 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
).asJava, new AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, topicResource3,
brokerResource).asJava, alterResult.values.keySet)
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+ assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[PolicyViolationException])
alterResult.values.get(topicResource2).get
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+ assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3),
classOf[InvalidConfigurationException])
+ assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
// Verify that no resources are updated since validate_only = true
ensureConsistentKRaftMetadata()
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 543b3b80cd..46cf3c9c4f 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.common.resource.{PatternType,
ResourcePattern, ResourceT
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType,
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica,
Uuid}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.slf4j.LoggerFactory
@@ -87,15 +87,17 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
- @Test
- def testClose(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testClose(quorum: String): Unit = {
val client = Admin.create(createConfig)
client.close()
client.close() // double close has no effect
}
- @Test
- def testListNodes(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListNodes(quorum: String): Unit = {
client = Admin.create(createConfig)
val brokerStrs = bootstrapServers().split(",").toList.sorted
var nodeStrs: List[String] = null
@@ -106,8 +108,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
}
- @Test
- def testAdminClientHandlingBadIPWithoutTimeout(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAdminClientHandlingBadIPWithoutTimeout(quorum: String): Unit = {
val config = createConfig
config.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"1000")
val returnBadAddressFirst = new HostResolver {
@@ -120,8 +123,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
client.describeCluster().nodes().get()
}
- @Test
- def testCreateExistingTopicsThrowTopicExistsException(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateExistingTopicsThrowTopicExistsException(quorum: String): Unit
= {
client = Admin.create(createConfig)
val topic = "mytopic"
val topics = Seq(topic)
@@ -130,14 +134,15 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
client.createTopics(newTopics.asJava).all.get()
waitForTopics(client, topics, List())
- val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (servers.size +
1).toShort))
+ val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (brokers.size +
1).toShort))
val e = assertThrows(classOf[ExecutionException],
() => client.createTopics(newTopicsWithInvalidRF.asJava, new
CreateTopicsOptions().validateOnly(true)).all.get())
assertTrue(e.getCause.isInstanceOf[TopicExistsException])
}
- @Test
- def testDeleteTopicsWithIds(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteTopicsWithIds(quorum: String): Unit = {
client = Admin.create(createConfig)
val topics = Seq("mytopic", "mytopic2", "mytopic3")
val newTopics = Seq(
@@ -154,15 +159,16 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
waitForTopics(client, List(), topics)
}
- @Test
- def testMetadataRefresh(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk")) // KRaft mode will be supported in
KAFKA-13910
+ def testMetadataRefresh(quorum: String): Unit = {
client = Admin.create(createConfig)
val topics = Seq("mytopic")
val newTopics = Seq(new NewTopic("mytopic", 3, 3.toShort))
client.createTopics(newTopics.asJava).all.get()
waitForTopics(client, expectedPresent = topics, expectedMissing = List())
- val controller = servers.find(_.config.brokerId ==
TestUtils.waitUntilControllerElected(zkClient)).get
+ val controller = brokers.find(_.config.brokerId ==
brokers.flatMap(_.metadataCache.getControllerId).head).get
controller.shutdown()
controller.awaitShutdown()
val topicDesc = client.describeTopics(topics.asJava).allTopicNames.get()
@@ -172,8 +178,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
/**
* describe should not auto create topics
*/
- @Test
- def testDescribeNonExistingTopic(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeNonExistingTopic(quorum: String): Unit = {
client = Admin.create(createConfig)
val existingTopic = "existing-topic"
@@ -183,18 +190,23 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val nonExistingTopic = "non-existing"
val results = client.describeTopics(Seq(nonExistingTopic,
existingTopic).asJava).topicNameValues()
assertEquals(existingTopic, results.get(existingTopic).get.name)
- assertThrows(classOf[ExecutionException], () =>
results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
- assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
+ assertFutureExceptionTypeEquals(results.get(nonExistingTopic),
classOf[UnknownTopicOrPartitionException])
+ if (!isKRaftTest()) {
+ assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
+ }
}
- @Test
- def testDescribeTopicsWithIds(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTopicsWithIds(quorum: String): Unit = {
client = Admin.create(createConfig)
val existingTopic = "existing-topic"
client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1,
1.toShort)).asJava).all.get()
waitForTopics(client, Seq(existingTopic), List())
- val existingTopicId =
zkClient.getTopicIdsForTopics(Set(existingTopic)).values.head
+ ensureConsistentKRaftMetadata()
+
+ val existingTopicId = brokers.head.metadataCache.getTopicId(existingTopic)
val nonExistingTopicId = Uuid.randomUuid()
@@ -203,37 +215,48 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertThrows(classOf[ExecutionException], () =>
results.get(nonExistingTopicId).get).getCause.isInstanceOf[UnknownTopicIdException]
}
- @Test
- def testDescribeCluster(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeCluster(quorum: String): Unit = {
client = Admin.create(createConfig)
val result = client.describeCluster
val nodes = result.nodes.get()
val clusterId = result.clusterId().get()
- assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId)
+ assertEquals(brokers.head.dataPlaneRequestProcessor.clusterId, clusterId)
val controller = result.controller().get()
-
assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
- getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
- val brokers = bootstrapServers().split(",")
- assertEquals(brokers.size, nodes.size)
+
+ if (isKRaftTest()) {
+ // In KRaft, we return a random brokerId as the current controller.
+ val brokerIds = brokers.map(_.config.brokerId).toSet
+ assertTrue(brokerIds.contains(controller.id))
+ } else {
+
assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
+ getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id)
+ }
+
+ val brokerEndpoints = bootstrapServers().split(",")
+ assertEquals(brokerEndpoints.size, nodes.size)
for (node <- nodes.asScala) {
val hostStr = s"${node.host}:${node.port}"
- assertTrue(brokers.contains(hostStr), s"Unknown host:port pair $hostStr
in brokerVersionInfos")
+ assertTrue(brokerEndpoints.contains(hostStr), s"Unknown host:port pair
$hostStr in brokerVersionInfos")
}
}
- @Test
- def testDescribeLogDirs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeLogDirs(quorum: String): Unit = {
client = Admin.create(createConfig)
val topic = "topic"
val leaderByPartition = createTopic(topic, numPartitions = 10)
val partitionsByBroker = leaderByPartition.groupBy { case (_, leaderId) =>
leaderId }.map { case (k, v) =>
k -> v.keys.toSeq
}
- val brokers = (0 until brokerCount).map(Integer.valueOf)
- val logDirInfosByBroker =
client.describeLogDirs(brokers.asJava).allDescriptions.get
+ ensureConsistentKRaftMetadata()
+ val brokerIds = (0 until brokerCount).map(Integer.valueOf)
+ val logDirInfosByBroker =
client.describeLogDirs(brokerIds.asJava).allDescriptions.get
(0 until brokerCount).foreach { brokerId =>
- val server = servers.find(_.config.brokerId == brokerId).get
+ val server = brokers.find(_.config.brokerId == brokerId).get
val expectedPartitions = partitionsByBroker(brokerId)
val logDirInfos = logDirInfosByBroker.get(brokerId)
val replicaInfos = logDirInfos.asScala.flatMap { case (_, logDirInfo) =>
@@ -249,36 +272,39 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
- @Test
- def testDescribeReplicaLogDirs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeReplicaLogDirs(quorum: String): Unit = {
client = Admin.create(createConfig)
val topic = "topic"
val leaderByPartition = createTopic(topic, numPartitions = 10)
val replicas = leaderByPartition.map { case (partition, brokerId) =>
new TopicPartitionReplica(topic, partition, brokerId)
}.toSeq
+ ensureConsistentKRaftMetadata()
val replicaDirInfos =
client.describeReplicaLogDirs(replicas.asJavaCollection).all.get
replicaDirInfos.forEach { (topicPartitionReplica, replicaDirInfo) =>
- val server = servers.find(_.config.brokerId ==
topicPartitionReplica.brokerId()).get
+ val server = brokers.find(_.config.brokerId ==
topicPartitionReplica.brokerId()).get
val tp = new TopicPartition(topicPartitionReplica.topic(),
topicPartitionReplica.partition())
assertEquals(server.logManager.getLog(tp).get.dir.getParent,
replicaDirInfo.getCurrentReplicaLogDir)
}
}
- @Test
- def testAlterReplicaLogDirs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterReplicaLogDirs(quorum: String): Unit = {
client = Admin.create(createConfig)
val topic = "topic"
val tp = new TopicPartition(topic, 0)
- val randomNums = servers.map(server => server -> Random.nextInt(2)).toMap
+ val randomNums = brokers.map(server => server -> Random.nextInt(2)).toMap
// Generate two mutually exclusive replicaAssignment
- val firstReplicaAssignment = servers.map { server =>
+ val firstReplicaAssignment = brokers.map { server =>
val logDir = new
File(server.config.logDirs(randomNums(server))).getAbsolutePath
new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
}.toMap
- val secondReplicaAssignment = servers.map { server =>
+ val secondReplicaAssignment = brokers.map { server =>
val logDir = new File(server.config.logDirs(1 -
randomNums(server))).getAbsolutePath
new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
}.toMap
@@ -292,14 +318,15 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
createTopic(topic, replicationFactor = brokerCount)
- servers.foreach { server =>
+ ensureConsistentKRaftMetadata()
+ brokers.foreach { server =>
val logDir = server.logManager.getLog(tp).get.dir.getParent
assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0,
server.config.brokerId)), logDir)
}
// Verify that replica can be moved to the specified log directory after
the topic has been created
client.alterReplicaLogDirs(secondReplicaAssignment.asJava, new
AlterReplicaLogDirsOptions).all.get
- servers.foreach { server =>
+ brokers.foreach { server =>
TestUtils.waitUntilTrue(() => {
val logDir = server.logManager.getLog(tp).get.dir.getParent
secondReplicaAssignment(new TopicPartitionReplica(topic, 0,
server.config.brokerId)) == logDir
@@ -332,7 +359,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
try {
TestUtils.waitUntilTrue(() => numMessages.get > 10, s"only $numMessages
messages are produced before timeout. Producer future ${producerFuture.value}")
client.alterReplicaLogDirs(firstReplicaAssignment.asJava, new
AlterReplicaLogDirsOptions).all.get
- servers.foreach { server =>
+ brokers.foreach { server =>
TestUtils.waitUntilTrue(() => {
val logDir = server.logManager.getLog(tp).get.dir.getParent
firstReplicaAssignment(new TopicPartitionReplica(topic, 0,
server.config.brokerId)) == logDir
@@ -347,7 +374,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val finalNumMessages = Await.result(producerFuture, Duration(20,
TimeUnit.SECONDS))
// Verify that all messages that are produced can be consumed
- val consumerRecords = TestUtils.consumeTopicRecords(servers, topic,
finalNumMessages,
+ val consumerRecords = TestUtils.consumeTopicRecords(brokers, topic,
finalNumMessages,
securityProtocol = securityProtocol, trustStoreFile = trustStoreFile)
consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new
String(consumerRecord.value))
@@ -697,8 +724,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
- @Test
- def testSeekAfterDeleteRecords(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSeekAfterDeleteRecords(quorum: String): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = Admin.create(createConfig)
@@ -726,8 +754,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(10L, consumer.position(topicPartition))
}
- @Test
- def testLogStartOffsetCheckpoint(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testLogStartOffsetCheckpoint(quorum: String): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = Admin.create(createConfig)
@@ -765,8 +794,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}, s"Expected low watermark of the partition to be 5 but got
${lowWatermark.getOrElse("no response within the timeout")}")
}
- @Test
- def testLogStartOffsetAfterDeleteRecords(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testLogStartOffsetAfterDeleteRecords(quorum: String): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = Admin.create(createConfig)
@@ -782,25 +812,26 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(3L, lowWatermark)
for (i <- 0 until brokerCount)
- assertEquals(3,
servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
+ assertEquals(3,
brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
}
- @Test
- def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(quorum: String):
Unit = {
val leaders = createTopic(topic, replicationFactor = brokerCount)
- val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
+ val followerIndex = if (leaders(0) != brokers(0).config.brokerId) 0 else 1
def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset:
Long): Unit = {
- TestUtils.waitUntilTrue(() =>
servers(followerIndex).replicaManager.localLog(topicPartition) != None,
+ TestUtils.waitUntilTrue(() =>
brokers(followerIndex).replicaManager.localLog(topicPartition) != None,
"Expected follower to create replica for
partition")
// wait until the follower discovers that log start offset moved beyond
its HW
TestUtils.waitUntilTrue(() => {
-
servers(followerIndex).replicaManager.localLog(topicPartition).get.logStartOffset
== expectedStartOffset
+
brokers(followerIndex).replicaManager.localLog(topicPartition).get.logStartOffset
== expectedStartOffset
}, s"Expected follower to discover new log start offset
$expectedStartOffset")
TestUtils.waitUntilTrue(() => {
-
servers(followerIndex).replicaManager.localLog(topicPartition).get.logEndOffset
== expectedEndOffset
+
brokers(followerIndex).replicaManager.localLog(topicPartition).get.logEndOffset
== expectedEndOffset
}, s"Expected follower to catch up to log end offset $expectedEndOffset")
}
@@ -821,7 +852,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// after the new replica caught up, all replicas should have same log
start offset
for (i <- 0 until brokerCount)
- assertEquals(3,
servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
+ assertEquals(3,
brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
// kill the same follower again, produce more records, and delete records
beyond follower's LOE
killBroker(followerIndex)
@@ -832,8 +863,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
}
- @Test
- def testAlterLogDirsAfterDeleteRecords(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterLogDirsAfterDeleteRecords(quorum: String): Unit = {
client = Admin.create(createConfig)
createTopic(topic, replicationFactor = brokerCount)
val expectedLEO = 100
@@ -845,27 +877,28 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
result.all().get()
// make sure we are in the expected state after delete records
for (i <- 0 until brokerCount) {
- assertEquals(3,
servers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
- assertEquals(expectedLEO,
servers(i).replicaManager.localLog(topicPartition).get.logEndOffset)
+ assertEquals(3,
brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset)
+ assertEquals(expectedLEO,
brokers(i).replicaManager.localLog(topicPartition).get.logEndOffset)
}
// we will create another dir just for one server
- val futureLogDir = servers(0).config.logDirs(1)
- val futureReplica = new TopicPartitionReplica(topic, 0,
servers(0).config.brokerId)
+ val futureLogDir = brokers(0).config.logDirs(1)
+ val futureReplica = new TopicPartitionReplica(topic, 0,
brokers(0).config.brokerId)
// Verify that replica can be moved to the specified log directory
client.alterReplicaLogDirs(Map(futureReplica ->
futureLogDir).asJava).all.get
TestUtils.waitUntilTrue(() => {
- futureLogDir ==
servers(0).logManager.getLog(topicPartition).get.dir.getParent
+ futureLogDir ==
brokers(0).logManager.getLog(topicPartition).get.dir.getParent
}, "timed out waiting for replica movement")
// once replica moved, its LSO and LEO should match other replicas
- assertEquals(3,
servers.head.replicaManager.localLog(topicPartition).get.logStartOffset)
- assertEquals(expectedLEO,
servers.head.replicaManager.localLog(topicPartition).get.logEndOffset)
+ assertEquals(3,
brokers.head.replicaManager.localLog(topicPartition).get.logStartOffset)
+ assertEquals(expectedLEO,
brokers.head.replicaManager.localLog(topicPartition).get.logEndOffset)
}
- @Test
- def testOffsetsForTimesAfterDeleteRecords(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testOffsetsForTimesAfterDeleteRecords(quorum: String): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = Admin.create(createConfig)
@@ -886,8 +919,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertNull(consumer.offsetsForTimes(Map(topicPartition ->
JLong.valueOf(0L)).asJava).get(topicPartition))
}
- @Test
- def testConsumeAfterDeleteRecords(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testConsumeAfterDeleteRecords(quorum: String): Unit = {
val consumer = createConsumer()
subscribeAndWaitForAssignment(topic, consumer)
@@ -909,8 +943,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TestUtils.consumeRecords(consumer, 2)
}
- @Test
- def testDeleteRecordsWithException(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteRecordsWithException(quorum: String): Unit = {
val consumer = createConsumer()
subscribeAndWaitForAssignment(topic, consumer)
@@ -934,8 +969,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(classOf[LeaderNotAvailableException], cause.getClass)
}
- @Test
- def testDescribeConfigsForTopic(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeConfigsForTopic(quorum: String): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = Admin.create(createConfig)
@@ -982,8 +1018,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
* Also see [[kafka.api.SaslSslAdminIntegrationTest.testAclOperations()]]
for tests of ACL operations
* when the authorizer is enabled.
*/
- @Test
- def testAclOperations(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAclOperations(quorum: String): Unit = {
val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
client = Admin.create(createConfig)
@@ -998,8 +1035,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
* Test closing the AdminClient with a generous timeout. Calls in progress
should be completed,
* since they can be done within the timeout. New calls should receive
timeouts.
*/
- @Test
- def testDelayedClose(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDelayedClose(quorum: String): Unit = {
client = Admin.create(createConfig)
val topics = Seq("mytopic", "mytopic2")
val newTopics = topics.map(new NewTopic(_, 1, 1.toShort))
@@ -1015,8 +1053,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
* Test closing the AdminClient with a timeout of 0, when there are calls
with extremely long
* timeouts in progress. The calls should be aborted after the hard
shutdown timeout elapses.
*/
- @Test
- def testForceClose(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testForceClose(quorum: String): Unit = {
val config = createConfig
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
client = Admin.create(config)
@@ -1032,8 +1071,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
* Check that a call with a timeout does not complete before the minimum
timeout has elapsed,
* even when the default request timeout is shorter.
*/
- @Test
- def testMinimumRequestTimeouts(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testMinimumRequestTimeouts(quorum: String): Unit = {
val config = createConfig
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
@@ -1049,8 +1089,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
/**
* Test injecting timeouts for calls that are in flight.
*/
- @Test
- def testCallInFlightTimeouts(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCallInFlightTimeouts(quorum: String): Unit = {
val config = createConfig
config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
config.put(AdminClientConfig.RETRIES_CONFIG, "0")
@@ -1068,8 +1109,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
/**
* Test the consumer group APIs.
*/
- @Test
- def testConsumerGroups(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testConsumerGroups(quorum: String): Unit = {
val config = createConfig
client = Admin.create(config)
try {
@@ -1287,8 +1329,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
- @Test
- def testDeleteConsumerGroupOffsets(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteConsumerGroupOffsets(quorum: String): Unit = {
val config = createConfig
client = Admin.create(config)
try {
@@ -1359,8 +1402,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
- @Test
- def testElectPreferredLeaders(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testElectPreferredLeaders(quorum: String): Unit = {
client = Admin.create(createConfig)
val prefer0 = Seq(0, 1, 2)
@@ -1368,10 +1412,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val prefer2 = Seq(2, 0, 1)
val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0)
- TestUtils.createTopic(zkClient, partition1.topic, Map[Int,
Seq[Int]](partition1.partition -> prefer0), servers)
+ createTopicWithAssignment(partition1.topic, Map[Int,
Seq[Int]](partition1.partition -> prefer0))
val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0)
- TestUtils.createTopic(zkClient, partition2.topic, Map[Int,
Seq[Int]](partition2.partition -> prefer0), servers)
+ createTopicWithAssignment(partition2.topic, Map[Int,
Seq[Int]](partition2.partition -> prefer0))
def preferredLeader(topicPartition: TopicPartition): Int = {
val partitionMetadata = getTopicMetadata(client,
topicPartition.topic).partitions.get(topicPartition.partition)
@@ -1380,19 +1424,18 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
/** Changes the <i>preferred</i> leader without changing the
<i>current</i> leader. */
- def changePreferredLeader(newAssignment: Seq[Int]) = {
+ def changePreferredLeader(newAssignment: Seq[Int]): Unit = {
val preferred = newAssignment.head
- val prior1 = zkClient.getLeaderForPartition(partition1).get
- val prior2 = zkClient.getLeaderForPartition(partition2).get
-
- var m = Map.empty[TopicPartition, Seq[Int]]
+ val prior1 =
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic,
partition1.partition(), listenerName).get.id()
+ val prior2 =
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic,
partition2.partition(), listenerName).get.id()
+ var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
if (prior1 != preferred)
- m += partition1 -> newAssignment
+ m += partition1 -> Optional.of(new
NewPartitionReassignment(newAssignment.map(Int.box).asJava))
if (prior2 != preferred)
- m += partition2 -> newAssignment
+ m += partition2 -> Optional.of(new
NewPartitionReassignment(newAssignment.map(Int.box).asJava))
+ client.alterPartitionReassignments(m.asJava).all().get()
- zkClient.createPartitionReassignment(m)
TestUtils.waitUntilTrue(
() => preferredLeader(partition1) == preferred &&
preferredLeader(partition2) == preferred,
s"Expected preferred leader to become $preferred, but is
${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
@@ -1408,7 +1451,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Noop election
var electResult = client.electLeaders(ElectionType.PREFERRED,
Set(partition1).asJava)
- var exception = electResult.partitions.get.get(partition1).get
+ val exception = electResult.partitions.get.get(partition1).get
assertEquals(classOf[ElectionNotNeededException], exception.getClass)
TestUtils.assertLeader(client, partition1, 0)
@@ -1437,13 +1480,24 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertFalse(electResult.partitions.get.get(partition2).isPresent)
TestUtils.assertLeader(client, partition2, 1)
+ def assertUnknownTopicOrPartition(
+ topicPartition: TopicPartition,
+ result: ElectLeadersResult
+ ): Unit = {
+ val exception = result.partitions.get.get(topicPartition).get
+ assertEquals(classOf[UnknownTopicOrPartitionException],
exception.getClass)
+ if (isKRaftTest()) {
+ assertEquals(s"No such topic as ${topicPartition.topic()}",
exception.getMessage)
+ } else {
+ assertEquals("The partition does not exist.", exception.getMessage)
+ }
+ }
+
// unknown topic
val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
electResult = client.electLeaders(ElectionType.PREFERRED,
Set(unknownPartition).asJava)
assertEquals(Set(unknownPartition).asJava,
electResult.partitions.get.keySet)
- exception = electResult.partitions.get.get(unknownPartition).get
- assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
- assertEquals("The partition does not exist.", exception.getMessage)
+ assertUnknownTopicOrPartition(unknownPartition, electResult)
TestUtils.assertLeader(client, partition1, 1)
TestUtils.assertLeader(client, partition2, 1)
@@ -1455,9 +1509,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(Set(unknownPartition, partition1).asJava,
electResult.partitions.get.keySet)
TestUtils.assertLeader(client, partition1, 2)
TestUtils.assertLeader(client, partition2, 1)
- exception = electResult.partitions.get.get(unknownPartition).get
- assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
- assertEquals("The partition does not exist.", exception.getMessage)
+ assertUnknownTopicOrPartition(unknownPartition, electResult)
// elect preferred leader for partition 2
electResult = client.electLeaders(ElectionType.PREFERRED,
Set(partition2).asJava)
@@ -1468,41 +1520,48 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Now change the preferred leader to 1
changePreferredLeader(prefer1)
// but shut it down...
- servers(1).shutdown()
+ brokers(1).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2),
Set(1))
+ def assertPreferredLeaderNotAvailable(
+ topicPartition: TopicPartition,
+ result: ElectLeadersResult
+ ): Unit = {
+ val exception = result.partitions.get.get(topicPartition).get
+ assertEquals(classOf[PreferredLeaderNotAvailableException],
exception.getClass)
+ if (isKRaftTest()) {
+ assertTrue(exception.getMessage.contains(
+ "The preferred leader was not available."),
+ s"Unexpected message: ${exception.getMessage}")
+ } else {
+ assertTrue(exception.getMessage.contains(
+ s"Failed to elect leader for partition $topicPartition under
strategy PreferredReplicaPartitionLeaderElectionStrategy"),
+ s"Unexpected message: ${exception.getMessage}")
+ }
+ }
+
// ... now what happens if we try to elect the preferred leader and it's
down?
val shortTimeout = new ElectLeadersOptions().timeoutMs(10000)
electResult = client.electLeaders(ElectionType.PREFERRED,
Set(partition1).asJava, shortTimeout)
assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
- exception = electResult.partitions.get.get(partition1).get
- assertEquals(classOf[PreferredLeaderNotAvailableException],
exception.getClass)
- assertTrue(exception.getMessage.contains(
- "Failed to elect leader for partition elect-preferred-leaders-topic-1-0
under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
- s"Wrong message ${exception.getMessage}")
+
+ assertPreferredLeaderNotAvailable(partition1, electResult)
TestUtils.assertLeader(client, partition1, 2)
// preferred leader unavailable with null argument
electResult = client.electLeaders(ElectionType.PREFERRED, null,
shortTimeout)
+ assertTrue(Set(partition1,
partition2).subsetOf(electResult.partitions.get.keySet.asScala))
- exception = electResult.partitions.get.get(partition1).get
- assertEquals(classOf[PreferredLeaderNotAvailableException],
exception.getClass)
- assertTrue(exception.getMessage.contains(
- "Failed to elect leader for partition elect-preferred-leaders-topic-1-0
under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
- s"Wrong message ${exception.getMessage}")
-
- exception = electResult.partitions.get.get(partition2).get
- assertEquals(classOf[PreferredLeaderNotAvailableException],
exception.getClass)
- assertTrue(exception.getMessage.contains(
- "Failed to elect leader for partition elect-preferred-leaders-topic-2-0
under strategy PreferredReplicaPartitionLeaderElectionStrategy"),
- s"Wrong message ${exception.getMessage}")
-
+ assertPreferredLeaderNotAvailable(partition1, electResult)
TestUtils.assertLeader(client, partition1, 2)
+
+ assertPreferredLeaderNotAvailable(partition2, electResult)
TestUtils.assertLeader(client, partition2, 2)
}
- @Test
- def testElectUncleanLeadersForOnePartition(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testElectUncleanLeadersForOnePartition(quorum: String): Unit = {
// Case: unclean leader election with one topic partition
client = Admin.create(createConfig)
@@ -1511,23 +1570,24 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val assignment1 = Seq(broker1, broker2)
val partition1 = new TopicPartition("unclean-test-topic-1", 0)
- TestUtils.createTopic(zkClient, partition1.topic, Map[Int,
Seq[Int]](partition1.partition -> assignment1), servers)
+ createTopicWithAssignment(partition1.topic, Map[Int,
Seq[Int]](partition1.partition -> assignment1))
TestUtils.assertLeader(client, partition1, broker1)
- servers(broker2).shutdown()
+ brokers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
- servers(broker1).shutdown()
+ brokers(broker1).shutdown()
TestUtils.assertNoLeader(client, partition1)
- servers(broker2).startup()
+ brokers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(partition1).asJava)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
TestUtils.assertLeader(client, partition1, broker2)
}
- @Test
- def testElectUncleanLeadersForManyPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testElectUncleanLeadersForManyPartitions(quorum: String): Unit = {
// Case: unclean leader election with many topic partitions
client = Admin.create(createConfig)
@@ -1540,22 +1600,20 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val partition1 = new TopicPartition(topic, 0)
val partition2 = new TopicPartition(topic, 1)
- TestUtils.createTopic(
- zkClient,
+ createTopicWithAssignment(
topic,
- Map(partition1.partition -> assignment1, partition2.partition ->
assignment2),
- servers
+ Map(partition1.partition -> assignment1, partition2.partition ->
assignment2)
)
TestUtils.assertLeader(client, partition1, broker1)
TestUtils.assertLeader(client, partition2, broker1)
- servers(broker2).shutdown()
+ brokers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2),
Set(broker2))
- servers(broker1).shutdown()
+ brokers(broker1).shutdown()
TestUtils.assertNoLeader(client, partition1)
TestUtils.assertNoLeader(client, partition2)
- servers(broker2).startup()
+ brokers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(partition1, partition2).asJava)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
@@ -1564,8 +1622,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, broker2)
}
- @Test
- def testElectUncleanLeadersForAllPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testElectUncleanLeadersForAllPartitions(quorum: String): Unit = {
// Case: noop unclean leader election and valid unclean leader election
for all partitions
client = Admin.create(createConfig)
@@ -1579,22 +1638,20 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val partition1 = new TopicPartition(topic, 0)
val partition2 = new TopicPartition(topic, 1)
- TestUtils.createTopic(
- zkClient,
+ createTopicWithAssignment(
topic,
- Map(partition1.partition -> assignment1, partition2.partition ->
assignment2),
- servers
+ Map(partition1.partition -> assignment1, partition2.partition ->
assignment2)
)
TestUtils.assertLeader(client, partition1, broker1)
TestUtils.assertLeader(client, partition2, broker1)
- servers(broker2).shutdown()
+ brokers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
- servers(broker1).shutdown()
+ brokers(broker1).shutdown()
TestUtils.assertNoLeader(client, partition1)
TestUtils.assertLeader(client, partition2, broker3)
- servers(broker2).startup()
+ brokers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
@@ -1603,8 +1660,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, broker3)
}
- @Test
- def testElectUncleanLeadersForUnknownPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testElectUncleanLeadersForUnknownPartitions(quorum: String): Unit = {
// Case: unclean leader election for unknown topic
client = Admin.create(createConfig)
@@ -1616,11 +1674,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val unknownPartition = new TopicPartition(topic, 1)
val unknownTopic = new TopicPartition("unknown-topic", 0)
- TestUtils.createTopic(
- zkClient,
+ createTopicWithAssignment(
topic,
- Map(0 -> assignment1),
- servers
+ Map(0 -> assignment1)
)
TestUtils.assertLeader(client, new TopicPartition(topic, 0), broker1)
@@ -1630,8 +1686,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(electResult.partitions.get.get(unknownTopic).get.isInstanceOf[UnknownTopicOrPartitionException])
}
- @Test
- def testElectUncleanLeadersWhenNoLiveBrokers(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testElectUncleanLeadersWhenNoLiveBrokers(quorum: String): Unit = {
// Case: unclean leader election with no live brokers
client = Admin.create(createConfig)
@@ -1642,26 +1699,25 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val topic = "unclean-test-topic-1"
val partition1 = new TopicPartition(topic, 0)
- TestUtils.createTopic(
- zkClient,
+ createTopicWithAssignment(
topic,
- Map(partition1.partition -> assignment1),
- servers
+ Map(partition1.partition -> assignment1)
)
TestUtils.assertLeader(client, partition1, broker1)
- servers(broker2).shutdown()
+ brokers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
- servers(broker1).shutdown()
+ brokers(broker1).shutdown()
TestUtils.assertNoLeader(client, partition1)
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(partition1).asJava)
assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
}
- @Test
- def testElectUncleanLeadersNoop(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testElectUncleanLeadersNoop(quorum: String): Unit = {
// Case: noop unclean leader election with explicit topic partitions
client = Admin.create(createConfig)
@@ -1672,25 +1728,24 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val topic = "unclean-test-topic-1"
val partition1 = new TopicPartition(topic, 0)
- TestUtils.createTopic(
- zkClient,
+ createTopicWithAssignment(
topic,
- Map(partition1.partition -> assignment1),
- servers
+ Map(partition1.partition -> assignment1)
)
TestUtils.assertLeader(client, partition1, broker1)
- servers(broker1).shutdown()
+ brokers(broker1).shutdown()
TestUtils.assertLeader(client, partition1, broker2)
- servers(broker1).startup()
+ brokers(broker1).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(partition1).asJava)
assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[ElectionNotNeededException])
}
- @Test
- def testElectUncleanLeadersAndNoop(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testElectUncleanLeadersAndNoop(quorum: String): Unit = {
// Case: one noop unclean leader election and one valid unclean leader
election
client = Admin.create(createConfig)
@@ -1704,22 +1759,20 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val partition1 = new TopicPartition(topic, 0)
val partition2 = new TopicPartition(topic, 1)
- TestUtils.createTopic(
- zkClient,
+ createTopicWithAssignment(
topic,
- Map(partition1.partition -> assignment1, partition2.partition ->
assignment2),
- servers
+ Map(partition1.partition -> assignment1, partition2.partition ->
assignment2)
)
TestUtils.assertLeader(client, partition1, broker1)
TestUtils.assertLeader(client, partition2, broker1)
- servers(broker2).shutdown()
+ brokers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
- servers(broker1).shutdown()
+ brokers(broker1).shutdown()
TestUtils.assertNoLeader(client, partition1)
TestUtils.assertLeader(client, partition2, broker3)
- servers(broker2).startup()
+ brokers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(partition1, partition2).asJava)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
@@ -1728,8 +1781,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, broker3)
}
- @Test
- def testListReassignmentsDoesNotShowNonReassigningPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListReassignmentsDoesNotShowNonReassigningPartitions(quorum:
String): Unit = {
client = Admin.create(createConfig)
// Create topics
@@ -1744,8 +1798,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(0, allReassignmentsMap.size())
}
- @Test
- def testListReassignmentsDoesNotShowDeletedPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListReassignmentsDoesNotShowDeletedPartitions(quorum: String): Unit
= {
client = Admin.create(createConfig)
val topic = "list-reassignments-no-reassignments"
@@ -1925,8 +1980,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(appendValues,
configs.get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp).value)
}
- @Test
- def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(quorum: String):
Unit = {
client = Admin.create(createConfig)
val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
client.incrementalAlterConfigs(Map(broker0Resource ->
@@ -1937,9 +1993,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
).asJavaCollection).asJava).all().get()
TestUtils.waitUntilTrue(() => {
val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
- all().get().get(broker0Resource).entries().asScala.map {
- case entry => (entry.name, entry.value)
- }.toMap
+ all().get().get(broker0Resource).entries().asScala.map(entry =>
(entry.name, entry.value)).toMap
("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
"")) &&
"456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"")))
}, "Expected to see the broker properties we just set", pause=25)
@@ -1953,17 +2007,16 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
).asJavaCollection).asJava).all().get()
TestUtils.waitUntilTrue(() => {
val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
- all().get().get(broker0Resource).entries().asScala.map {
- case entry => (entry.name, entry.value)
- }.toMap
+ all().get().get(broker0Resource).entries().asScala.map(entry =>
(entry.name, entry.value)).toMap
("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
"")) &&
"654".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"")) &&
"987".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
"")))
}, "Expected to see the broker properties we just modified", pause=25)
}
- @Test
- def testIncrementalAlterConfigsDeleteBrokerConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testIncrementalAlterConfigsDeleteBrokerConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
client.incrementalAlterConfigs(Map(broker0Resource ->
@@ -1976,9 +2029,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
).asJavaCollection).asJava).all().get()
TestUtils.waitUntilTrue(() => {
val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
- all().get().get(broker0Resource).entries().asScala.map {
- case entry => (entry.name, entry.value)
- }.toMap
+ all().get().get(broker0Resource).entries().asScala.map(entry =>
(entry.name, entry.value)).toMap
("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
"")) &&
"456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"")) &&
"789".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
"")))
@@ -1993,17 +2044,16 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
).asJavaCollection).asJava).all().get()
TestUtils.waitUntilTrue(() => {
val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
- all().get().get(broker0Resource).entries().asScala.map {
- case entry => (entry.name, entry.value)
- }.toMap
+ all().get().get(broker0Resource).entries().asScala.map(entry =>
(entry.name, entry.value)).toMap
("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
"")) &&
"".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
"")) &&
"".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp,
"")))
}, "Expected to see the broker properties we just removed to be deleted",
pause=25)
}
- @Test
- def testInvalidIncrementalAlterConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidIncrementalAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
@@ -2015,14 +2065,14 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
createTopic(topic2)
- //Add duplicate Keys for topic1
+ // Add duplicate Keys for topic1
var topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp,
"0.75"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp,
"0.65"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp,
"gzip"), AlterConfigOp.OpType.SET) // valid entry
).asJavaCollection
- //Add valid config for topic2
+ // Add valid config for topic2
var topic2AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp,
"0.9"), AlterConfigOp.OpType.SET)
).asJavaCollection
@@ -2033,12 +2083,13 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
).asJava)
assertEquals(Set(topic1Resource, topic2Resource).asJava,
alterResult.values.keySet)
- //InvalidRequestException error for topic1
+ // InvalidRequestException error for topic1
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidRequestException],
Some("Error due to duplicate config keys"))
- //operation should succeed for topic2
+ // Operation should succeed for topic2
alterResult.values().get(topic2Resource).get()
+ ensureConsistentKRaftMetadata()
// Verify that topic1 is not config not updated, and topic2 config is
updated
val describeResult = client.describeConfigs(Seq(topic1Resource,
topic2Resource).asJava)
@@ -2046,10 +2097,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(2, configs.size)
assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
configs.get(topic1Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
- assertEquals(Defaults.CompressionType.toString,
configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value)
+ assertEquals(Defaults.CompressionType,
configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value)
assertEquals("0.9",
configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
- //check invalid use of append/subtract operation types
+ // Check invalid use of append/subtract operation types
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp,
"gzip"), AlterConfigOp.OpType.APPEND)
).asJavaCollection
@@ -2064,14 +2115,21 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
).asJava)
assertEquals(Set(topic1Resource, topic2Resource).asJava,
alterResult.values.keySet)
- assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidRequestException],
- Some("Config value append is not allowed for config"))
-
- assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource),
classOf[InvalidRequestException],
- Some("Config value subtract is not allowed for config"))
+ assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
+ if (isKRaftTest()) {
+ Some("Can't APPEND to key compression.type because its type is not
LIST.")
+ } else {
+ Some("Config value append is not allowed for config")
+ })
+ assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource),
classOf[InvalidConfigurationException],
+ if (isKRaftTest()) {
+ Some("Can't SUBTRACT to key compression.type because its type is not
LIST.")
+ } else {
+ Some("Config value subtract is not allowed for config")
+ })
- //try to add invalid config
+ // Try to add invalid config
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp,
"1.1"), AlterConfigOp.OpType.SET)
).asJavaCollection
@@ -2082,11 +2140,16 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
- Some("Invalid config value for resource"))
+ if (isKRaftTest()) {
+ Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio:
Value must be no more than 1")
+ } else {
+ Some("Invalid config value for resource")
+ })
}
- @Test
- def testInvalidAlterPartitionReassignments(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidAlterPartitionReassignments(quorum: String): Unit = {
client = Admin.create(createConfig)
val topic = "alter-reassignments-topic-1"
val tp1 = new TopicPartition(topic, 0)
@@ -2124,8 +2187,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3),
classOf[InvalidReplicaAssignmentException])
}
- @Test
- def testLongTopicNames(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testLongTopicNames(quorum: String): Unit = {
val client = Admin.create(createConfig)
val longTopicName = String.join("", Collections.nCopies(249, "x"));
val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
@@ -2137,14 +2201,15 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(results.containsKey(invalidTopicName))
assertFutureExceptionTypeEquals(results.get(invalidTopicName),
classOf[InvalidTopicException])
assertFutureExceptionTypeEquals(client.alterReplicaLogDirs(
- Map(new TopicPartitionReplica(longTopicName, 0, 0) ->
servers(0).config.logDirs(0)).asJava).all(),
+ Map(new TopicPartitionReplica(longTopicName, 0, 0) ->
brokers(0).config.logDirs(0)).asJava).all(),
classOf[InvalidTopicException])
client.close()
}
// Verify that createTopics and alterConfigs fail with null values
- @Test
- def testNullConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk"))
+ def testNullConfigs(quorum: String): Unit = {
def validateLogConfig(compressionType: String): Unit = {
val logConfig = zkClient.getLogConfigs(Set(topic),
Collections.emptyMap[String, AnyRef])._1(topic)
@@ -2182,8 +2247,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
validateLogConfig(compressionType = "producer")
}
- @Test
- def testDescribeConfigsForLog4jLogLevels(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeConfigsForLog4jLogLevels(quorum: String): Unit = {
client = Admin.create(createConfig)
LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create
the logger")
val loggerConfig = describeBrokerLoggers()
@@ -2198,9 +2264,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty)
}
- @Test
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
@Disabled // To be re-enabled once KAFKA-8779 is resolved
- def testIncrementalAlterConfigsForLog4jLogLevels(): Unit = {
+ def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = {
client = Admin.create(createConfig)
val initialLoggerConfig = describeBrokerLoggers()
@@ -2262,9 +2329,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
* 4. Change ROOT logger to ERROR
* 5. Ensure the kafka.controller.KafkaController logger's level is ERROR
(the curent root logger level)
*/
- @Test
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
@Disabled // To be re-enabled once KAFKA-8779 is resolved
- def
testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit
= {
+ def
testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(quorum:
String): Unit = {
client = Admin.create(createConfig)
// step 1 - configure root logger
val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL
@@ -2304,9 +2372,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(newRootLogLevel,
newRootLoggerConfig.get("kafka.controller.KafkaController").value())
}
- @Test
- @Disabled // To be re-enabled once KAFKA-8779 is resolved
- def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger():
Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ @Disabled // Zk to be re-enabled once KAFKA-8779 is resolved
+ def
testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum:
String): Unit = {
client = Admin.create(createConfig)
val deleteRootLoggerEntry = Seq(
new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, ""),
AlterConfigOp.OpType.DELETE)
@@ -2315,9 +2384,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(assertThrows(classOf[ExecutionException], () =>
alterBrokerLoggers(deleteRootLoggerEntry)).getCause.isInstanceOf[InvalidRequestException])
}
- @Test
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
@Disabled // To be re-enabled once KAFKA-8779 is resolved
- def
testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs():
Unit = {
+ def
testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum:
String): Unit = {
client = Admin.create(createConfig)
val validLoggerName = "kafka.server.KafkaRequestHandler"
val expectedValidLoggerLogLevel =
describeBrokerLoggers().get(validLoggerName)
@@ -2359,9 +2429,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
* The AlterConfigs API is deprecated and should not support altering log
levels
*/
@nowarn("cat=deprecation")
- @Test
- @Disabled // To be re-enabled once KAFKA-8779 is resolved
- def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft")) // Zk to be re-enabled once
KAFKA-8779 is resolved
+ def testAlterConfigsForLog4jLogLevelsDoesNotWork(quorum: String): Unit = {
client = Admin.create(createConfig)
val alterLogLevelsEntries = Seq(
@@ -2591,9 +2661,9 @@ object PlaintextAdminIntegrationTest {
).asJava)
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava,
alterResult.values.keySet)
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException])
+ assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[InvalidConfigurationException])
alterResult.values.get(topicResource2).get
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+ assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
// Verify that first and third resources were not updated and second was
updated
test.ensureConsistentKRaftMetadata()
@@ -2620,9 +2690,9 @@ object PlaintextAdminIntegrationTest {
).asJava, new AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava,
alterResult.values.keySet)
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException])
+ assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[InvalidConfigurationException])
alterResult.values.get(topicResource2).get
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+ assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
// Verify that no resources are updated since validate_only = true
test.ensureConsistentKRaftMetadata()