This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1103c76d63a KAFKA-13899: Use INVALID_CONFIG error code consistently in
AlterConfig APIs (#12162)
1103c76d63a is described below
commit 1103c76d63a6bf14563eac8b5aaec4836061ff7e
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon May 16 17:41:23 2022 -0700
KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfig APIs
(#12162)
In the AlterConfigs/IncrementalAlterConfigs zk handler, we return
`INVALID_REQUEST` and `INVALID_CONFIG` inconsistently. The problem is in
`LogConfig.validate`. We may either return `ConfigException` or
`InvalidConfigException`. When the first of these is thrown, we catch it and
convert to `INVALID_REQUEST`. If the latter is thrown, then we return
`INVALID_CONFIG`. It seems more appropriate to return `INVALID_CONFIG`
consistently, which is what the KRaft implementation already does th [...]
Reviewers: José Armando García Sancio <[email protected]>
---
.../main/scala/kafka/server/ZkAdminManager.scala | 11 ++-
.../AdminClientWithPoliciesIntegrationTest.scala | 55 +++++++++-----
.../kafka/api/PlaintextAdminIntegrationTest.scala | 83 +++++++++++++---------
.../kafka/integration/KafkaServerTestHarness.scala | 9 +++
.../kafka/server/DynamicConfigChangeTest.scala | 28 +++++---
5 files changed, 123 insertions(+), 63 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 2852cd141fe..cb6796a09d5 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -407,7 +407,7 @@ class ZkAdminManager(val config: KafkaConfig,
case e @ (_: ConfigException | _: IllegalArgumentException) =>
val message = s"Invalid config value for resource $resource:
${e.getMessage}"
info(message)
- resource -> ApiError.fromThrowable(new
InvalidRequestException(message, e))
+ resource -> ApiError.fromThrowable(new
InvalidConfigurationException(message, e))
case e: Throwable =>
val configProps = new Properties
config.entries.asScala.filter(_.value != null).foreach { configEntry
=>
@@ -427,6 +427,10 @@ class ZkAdminManager(val config: KafkaConfig,
private def alterTopicConfigs(resource: ConfigResource, validateOnly:
Boolean,
configProps: Properties, configEntriesMap:
Map[String, String]): (ConfigResource, ApiError) = {
val topic = resource.name
+ if (topic.isEmpty()) {
+ throw new InvalidRequestException("Default topic resources are not
allowed.")
+ }
+
if (!metadataCache.contains(topic))
throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not
exist.")
@@ -489,6 +493,9 @@ class ZkAdminManager(val config: KafkaConfig,
resource.`type` match {
case ConfigResource.Type.TOPIC =>
+ if (resource.name.isEmpty()) {
+ throw new InvalidRequestException("Default topic resources are
not allowed.")
+ }
val configProps =
adminZkClient.fetchEntityConfig(ConfigType.Topic, resource.name)
prepareIncrementalConfigs(alterConfigOps, configProps,
LogConfig.configKeys)
alterTopicConfigs(resource, validateOnly, configProps,
configEntriesMap)
@@ -511,7 +518,7 @@ class ZkAdminManager(val config: KafkaConfig,
case e @ (_: ConfigException | _: IllegalArgumentException) =>
val message = s"Invalid config value for resource $resource:
${e.getMessage}"
info(message)
- resource -> ApiError.fromThrowable(new
InvalidRequestException(message, e))
+ resource -> ApiError.fromThrowable(new
InvalidConfigurationException(message, e))
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
val message = s"Error processing alter configs request for resource
$resource, config $alterConfigOps"
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index ab75dc31fb3..fb1b0d248db 100644
---
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -16,17 +16,20 @@ package kafka.api
import java.util
import java.util.Properties
import java.util.concurrent.ExecutionException
+
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigsOptions, Config, ConfigEntry}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.{InvalidRequestException,
PolicyViolationException}
+import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException, PolicyViolationException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.policy.AlterConfigPolicy
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull,
assertThrows, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -45,7 +48,7 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+ TestUtils.waitUntilBrokerMetadataIsPropagated(brokers)
}
@AfterEach
@@ -58,14 +61,25 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
def createConfig: util.Map[String, Object] =
Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG ->
bootstrapServers()).asJava
- override def generateConfigs = {
- val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnect)
- configs.foreach(props =>
props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy]))
+ override def generateConfigs: collection.Seq[KafkaConfig] = {
+ val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull)
+ configs.foreach(overrideNodeConfigs)
configs.map(KafkaConfig.fromProps)
}
- @Test
- def testValidAlterConfigs(): Unit = {
+ override def kraftControllerConfigs(): Seq[Properties] = {
+ val props = new Properties()
+ overrideNodeConfigs(props)
+ Seq(props)
+ }
+
+ private def overrideNodeConfigs(props: Properties): Unit = {
+ props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy])
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testValidAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
val topic1 = "describe-alter-configs-topic-1"
@@ -79,18 +93,20 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
createTopic(topic2, 1, 1)
- PlaintextAdminIntegrationTest.checkValidAlterConfigs(client,
topicResource1, topicResource2)
+ PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this,
topicResource1, topicResource2)
}
- @Test
- def testInvalidAlterConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
- PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(zkClient, servers,
client)
+ PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client)
}
@nowarn("cat=deprecation")
- @Test
- def testInvalidAlterConfigsDueToPolicy(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidAlterConfigsDueToPolicy(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
@@ -115,7 +131,7 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
val topicConfigEntries3 = Seq(new
ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava
- val brokerResource = new ConfigResource(ConfigResource.Type.BROKER,
servers.head.config.brokerId.toString)
+ val brokerResource = new ConfigResource(ConfigResource.Type.BROKER,
brokers.head.config.brokerId.toString)
val brokerConfigEntries = Seq(new
ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava
// Alter configs: second is valid, the others are invalid
@@ -129,10 +145,11 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
assertEquals(Set(topicResource1, topicResource2, topicResource3,
brokerResource).asJava, alterResult.values.keySet)
assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
alterResult.values.get(topicResource2).get
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+ assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that the second resource was updated and the others were not
+ ensureConsistentKRaftMetadata()
var describeResult = client.describeConfigs(Seq(topicResource1,
topicResource2, topicResource3, brokerResource).asJava)
var configs = describeResult.all.get
assertEquals(4, configs.size)
@@ -157,10 +174,11 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
assertEquals(Set(topicResource1, topicResource2, topicResource3,
brokerResource).asJava, alterResult.values.keySet)
assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
alterResult.values.get(topicResource2).get
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+ assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that no resources are updated since validate_only = true
+ ensureConsistentKRaftMetadata()
describeResult = client.describeConfigs(Seq(topicResource1,
topicResource2, topicResource3, brokerResource).asJava)
configs = describeResult.all.get
assertEquals(4, configs.size)
@@ -173,7 +191,6 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
}
-
}
object AdminClientWithPoliciesIntegrationTest {
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index d6aa7a7e9a2..543b3b80cdc 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -25,12 +25,13 @@ import java.util.concurrent.atomic.{AtomicBoolean,
AtomicInteger}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.{Collections, Optional, Properties}
import java.{time, util}
+
+import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.security.authorizer.AclEntry
-import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
+import kafka.server.{Defaults, DynamicConfig, KafkaConfig}
import kafka.utils.TestUtils._
import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
-import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.HostResolver
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
@@ -353,8 +354,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
- @Test
- def testDescribeAndAlterConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeAndAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
@@ -370,8 +372,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
createTopic(topic2)
// Describe topics and broker
- val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER,
servers(1).config.brokerId.toString)
- val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER,
servers(2).config.brokerId.toString)
+ val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER,
brokers(1).config.brokerId.toString)
+ val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER,
brokers(2).config.brokerId.toString)
val configResources = Seq(topicResource1, topicResource2, brokerResource1,
brokerResource2)
val describeResult = client.describeConfigs(configResources.asJava)
val configs = describeResult.all.get
@@ -395,10 +397,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertFalse(maxMessageBytes2.isSensitive)
assertFalse(maxMessageBytes2.isReadOnly)
- assertEquals(servers(1).config.nonInternalValues.size,
configs.get(brokerResource1).entries.size)
- assertEquals(servers(1).config.brokerId.toString,
configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value)
+ assertEquals(brokers(1).config.nonInternalValues.size,
configs.get(brokerResource1).entries.size)
+ assertEquals(brokers(1).config.brokerId.toString,
configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value)
val listenerSecurityProtocolMap =
configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp)
-
assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp),
listenerSecurityProtocolMap.value)
+
assertEquals(brokers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp),
listenerSecurityProtocolMap.value)
assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp,
listenerSecurityProtocolMap.name)
assertFalse(listenerSecurityProtocolMap.isDefault)
assertFalse(listenerSecurityProtocolMap.isSensitive)
@@ -410,18 +412,18 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(truststorePassword.isSensitive)
assertFalse(truststorePassword.isReadOnly)
val compressionType =
configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp)
- assertEquals(servers(1).config.compressionType, compressionType.value)
+ assertEquals(brokers(1).config.compressionType, compressionType.value)
assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)
assertTrue(compressionType.isDefault)
assertFalse(compressionType.isSensitive)
assertFalse(compressionType.isReadOnly)
- assertEquals(servers(2).config.nonInternalValues.size,
configs.get(brokerResource2).entries.size)
- assertEquals(servers(2).config.brokerId.toString,
configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
- assertEquals(servers(2).config.logCleanerThreads.toString,
+ assertEquals(brokers(2).config.nonInternalValues.size,
configs.get(brokerResource2).entries.size)
+ assertEquals(brokers(2).config.brokerId.toString,
configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
+ assertEquals(brokers(2).config.logCleanerThreads.toString,
configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
- checkValidAlterConfigs(client, topicResource1, topicResource2)
+ checkValidAlterConfigs(client, this, topicResource1, topicResource2)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -968,10 +970,11 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
futures.foreach(_.get)
}
- @Test
- def testInvalidAlterConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
- checkInvalidAlterConfigs(zkClient, servers, client)
+ checkInvalidAlterConfigs(this, client)
}
/**
@@ -1874,7 +1877,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
Some("Invalid value zip for configuration compression.type"))
} else {
-
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidRequestException],
+
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
Some("Invalid config value for resource"))
}
}
@@ -2078,7 +2081,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
).asJava)
assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
- assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidRequestException],
+ assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
Some("Invalid config value for resource"))
}
@@ -2487,7 +2490,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
object PlaintextAdminIntegrationTest {
@nowarn("cat=deprecation")
- def checkValidAlterConfigs(client: Admin, topicResource1: ConfigResource,
topicResource2: ConfigResource): Unit = {
+ def checkValidAlterConfigs(
+ admin: Admin,
+ test: KafkaServerTestHarness,
+ topicResource1: ConfigResource,
+ topicResource2: ConfigResource
+ ): Unit = {
// Alter topics
var topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.FlushMsProp, "1000")
@@ -2498,7 +2506,7 @@ object PlaintextAdminIntegrationTest {
new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
).asJava
- var alterResult = client.alterConfigs(Map(
+ var alterResult = admin.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2)
).asJava)
@@ -2507,7 +2515,8 @@ object PlaintextAdminIntegrationTest {
alterResult.all.get
// Verify that topics were updated correctly
- var describeResult = client.describeConfigs(Seq(topicResource1,
topicResource2).asJava)
+ test.ensureConsistentKRaftMetadata()
+ var describeResult = admin.describeConfigs(Seq(topicResource1,
topicResource2).asJava)
var configs = describeResult.all.get
assertEquals(2, configs.size)
@@ -2530,7 +2539,7 @@ object PlaintextAdminIntegrationTest {
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3")
).asJava
- alterResult = client.alterConfigs(Map(
+ alterResult = admin.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2)
).asJava, new AlterConfigsOptions().validateOnly(true))
@@ -2539,7 +2548,8 @@ object PlaintextAdminIntegrationTest {
alterResult.all.get
// Verify that topics were not updated due to validateOnly = true
- describeResult = client.describeConfigs(Seq(topicResource1,
topicResource2).asJava)
+ test.ensureConsistentKRaftMetadata()
+ describeResult = admin.describeConfigs(Seq(topicResource1,
topicResource2).asJava)
configs = describeResult.all.get
assertEquals(2, configs.size)
@@ -2550,15 +2560,18 @@ object PlaintextAdminIntegrationTest {
}
@nowarn("cat=deprecation")
- def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers:
Seq[KafkaServer], client: Admin): Unit = {
+ def checkInvalidAlterConfigs(
+ test: KafkaServerTestHarness,
+ admin: Admin
+ ): Unit = {
// Create topics
val topic1 = "invalid-alter-configs-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
- TestUtils.createTopic(zkClient, topic1, 1, 1, servers)
+ createTopicWithAdmin(admin, topic1, test.brokers, numPartitions = 1,
replicationFactor = 1)
val topic2 = "invalid-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
- TestUtils.createTopic(zkClient, topic2, 1, 1, servers)
+ createTopicWithAdmin(admin, topic2, test.brokers, numPartitions = 1,
replicationFactor = 1)
val topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this
value is invalid as it's above 1.0
@@ -2567,23 +2580,24 @@ object PlaintextAdminIntegrationTest {
var topicConfigEntries2 = Seq(new
ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
- val brokerResource = new ConfigResource(ConfigResource.Type.BROKER,
servers.head.config.brokerId.toString)
+ val brokerResource = new ConfigResource(ConfigResource.Type.BROKER,
test.brokers.head.config.brokerId.toString)
val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.ZkConnectProp,
"localhost:2181")).asJava
// Alter configs: first and third are invalid, second is valid
- var alterResult = client.alterConfigs(Map(
+ var alterResult = admin.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2),
brokerResource -> new Config(brokerConfigEntries)
).asJava)
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava,
alterResult.values.keySet)
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+ assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException])
alterResult.values.get(topicResource2).get
assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that first and third resources were not updated and second was
updated
- var describeResult = client.describeConfigs(Seq(topicResource1,
topicResource2, brokerResource).asJava)
+ test.ensureConsistentKRaftMetadata()
+ var describeResult = admin.describeConfigs(Seq(topicResource1,
topicResource2, brokerResource).asJava)
var configs = describeResult.all.get
assertEquals(3, configs.size)
@@ -2599,19 +2613,20 @@ object PlaintextAdminIntegrationTest {
// Alter configs with validateOnly = true: first and third are invalid,
second is valid
topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp,
"gzip")).asJava
- alterResult = client.alterConfigs(Map(
+ alterResult = admin.alterConfigs(Map(
topicResource1 -> new Config(topicConfigEntries1),
topicResource2 -> new Config(topicConfigEntries2),
brokerResource -> new Config(brokerConfigEntries)
).asJava, new AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava,
alterResult.values.keySet)
- assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+ assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException])
alterResult.values.get(topicResource2).get
assertTrue(assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
// Verify that no resources are updated since validate_only = true
- describeResult = client.describeConfigs(Seq(topicResource1,
topicResource2, brokerResource).asJava)
+ test.ensureConsistentKRaftMetadata()
+ describeResult = admin.describeConfigs(Seq(topicResource1, topicResource2,
brokerResource).asJava)
configs = describeResult.all.get
assertEquals(3, configs.size)
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index dda1ac3347d..f46713337a7 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -352,4 +352,13 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
)
}
}
+
+ def ensureConsistentKRaftMetadata(): Unit = {
+ if (isKRaftTest()) {
+ TestUtils.ensureConsistentKRaftMetadata(
+ brokers,
+ controllerServer
+ )
+ }
+ }
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 875b7605b57..84d6f5a2ef9 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -20,7 +20,7 @@ import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.util
import java.util.Collections.{singletonList, singletonMap}
-import java.util.Properties
+import java.util.{Collections, Properties}
import java.util.concurrent.ExecutionException
import kafka.integration.KafkaServerTestHarness
@@ -30,7 +30,7 @@ import kafka.server.Constants._
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config,
ConfigEntry}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.internals.QuotaConfigs
@@ -438,16 +438,28 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
- def testConfigureDefaultTopic(quorum: String): Unit = {
+ def testIncrementalAlterDefaultTopicConfig(quorum: String): Unit = {
val admin = createAdminClient()
try {
val resource = new ConfigResource(ConfigResource.Type.TOPIC, "")
val op = new AlterConfigOp(new ConfigEntry(FlushMessagesProp, "200000"),
SET)
- admin.incrementalAlterConfigs(Map(resource ->
List(op).asJavaCollection).asJava).all.get
- fail("Should fail with InvalidRequestException for topic doesn't exist")
- } catch {
- case e: ExecutionException =>
- assertEquals(classOf[InvalidRequestException], e.getCause().getClass())
+ val future = admin.incrementalAlterConfigs(Map(resource ->
List(op).asJavaCollection).asJava).all
+ TestUtils.assertFutureExceptionTypeEquals(future,
classOf[InvalidRequestException])
+ } finally {
+ admin.close()
+ }
+ }
+
+ @nowarn("cat=deprecation")
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterDefaultTopicConfig(quorum: String): Unit = {
+ val admin = createAdminClient()
+ try {
+ val resource = new ConfigResource(ConfigResource.Type.TOPIC, "")
+ val config = new Config(Collections.singleton(new
ConfigEntry(FlushMessagesProp, "200000")))
+ val future = admin.alterConfigs(Map(resource -> config).asJava).all
+ TestUtils.assertFutureExceptionTypeEquals(future,
classOf[InvalidRequestException])
} finally {
admin.close()
}