This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1103c76d63a KAFKA-13899: Use INVALID_CONFIG error code consistently in 
AlterConfig APIs (#12162)
1103c76d63a is described below

commit 1103c76d63a6bf14563eac8b5aaec4836061ff7e
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon May 16 17:41:23 2022 -0700

    KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfig APIs 
(#12162)
    
    In the AlterConfigs/IncrementalAlterConfigs zk handler, we return 
`INVALID_REQUEST` and `INVALID_CONFIG` inconsistently. The problem is in 
`LogConfig.validate`. We may either return `ConfigException` or 
`InvalidConfigException`. When the first of these is thrown, we catch it and 
convert to `INVALID_REQUEST`. If the latter is thrown, then we return 
`INVALID_CONFIG`. It seems more appropriate to return `INVALID_CONFIG` 
consistently, which is what the KRaft implementation already does th [...]
    
    Reviewers: José Armando García Sancio <[email protected]>
---
 .../main/scala/kafka/server/ZkAdminManager.scala   | 11 ++-
 .../AdminClientWithPoliciesIntegrationTest.scala   | 55 +++++++++-----
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 83 +++++++++++++---------
 .../kafka/integration/KafkaServerTestHarness.scala |  9 +++
 .../kafka/server/DynamicConfigChangeTest.scala     | 28 +++++---
 5 files changed, 123 insertions(+), 63 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala 
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 2852cd141fe..cb6796a09d5 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -407,7 +407,7 @@ class ZkAdminManager(val config: KafkaConfig,
         case e @ (_: ConfigException | _: IllegalArgumentException) =>
           val message = s"Invalid config value for resource $resource: 
${e.getMessage}"
           info(message)
-          resource -> ApiError.fromThrowable(new 
InvalidRequestException(message, e))
+          resource -> ApiError.fromThrowable(new 
InvalidConfigurationException(message, e))
         case e: Throwable =>
           val configProps = new Properties
           config.entries.asScala.filter(_.value != null).foreach { configEntry 
=>
@@ -427,6 +427,10 @@ class ZkAdminManager(val config: KafkaConfig,
   private def alterTopicConfigs(resource: ConfigResource, validateOnly: 
Boolean,
                                 configProps: Properties, configEntriesMap: 
Map[String, String]): (ConfigResource, ApiError) = {
     val topic = resource.name
+    if (topic.isEmpty()) {
+      throw new InvalidRequestException("Default topic resources are not 
allowed.")
+    }
+
     if (!metadataCache.contains(topic))
       throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not 
exist.")
 
@@ -489,6 +493,9 @@ class ZkAdminManager(val config: KafkaConfig,
 
         resource.`type` match {
           case ConfigResource.Type.TOPIC =>
+            if (resource.name.isEmpty()) {
+              throw new InvalidRequestException("Default topic resources are 
not allowed.")
+            }
             val configProps = 
adminZkClient.fetchEntityConfig(ConfigType.Topic, resource.name)
             prepareIncrementalConfigs(alterConfigOps, configProps, 
LogConfig.configKeys)
             alterTopicConfigs(resource, validateOnly, configProps, 
configEntriesMap)
@@ -511,7 +518,7 @@ class ZkAdminManager(val config: KafkaConfig,
         case e @ (_: ConfigException | _: IllegalArgumentException) =>
           val message = s"Invalid config value for resource $resource: 
${e.getMessage}"
           info(message)
-          resource -> ApiError.fromThrowable(new 
InvalidRequestException(message, e))
+          resource -> ApiError.fromThrowable(new 
InvalidConfigurationException(message, e))
         case e: Throwable =>
           // Log client errors at a lower level than unexpected exceptions
           val message = s"Error processing alter configs request for resource 
$resource, config $alterConfigOps"
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index ab75dc31fb3..fb1b0d248db 100644
--- 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -16,17 +16,20 @@ package kafka.api
 import java.util
 import java.util.Properties
 import java.util.concurrent.ExecutionException
+
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig}
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigsOptions, Config, ConfigEntry}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.{InvalidRequestException, 
PolicyViolationException}
+import org.apache.kafka.common.errors.{InvalidConfigurationException, 
InvalidRequestException, PolicyViolationException}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, 
assertThrows, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
@@ -45,7 +48,7 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+    TestUtils.waitUntilBrokerMetadataIsPropagated(brokers)
   }
 
   @AfterEach
@@ -58,14 +61,25 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
   def createConfig: util.Map[String, Object] =
     Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> 
bootstrapServers()).asJava
 
-  override def generateConfigs = {
-    val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnect)
-    configs.foreach(props => 
props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy]))
+  override def generateConfigs: collection.Seq[KafkaConfig] = {
+    val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull)
+    configs.foreach(overrideNodeConfigs)
     configs.map(KafkaConfig.fromProps)
   }
 
-  @Test
-  def testValidAlterConfigs(): Unit = {
+  override def kraftControllerConfigs(): Seq[Properties] = {
+    val props = new Properties()
+    overrideNodeConfigs(props)
+    Seq(props)
+  }
+
+  private def overrideNodeConfigs(props: Properties): Unit = {
+    props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy])
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testValidAlterConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
     // Create topics
     val topic1 = "describe-alter-configs-topic-1"
@@ -79,18 +93,20 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
     createTopic(topic2, 1, 1)
 
-    PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, 
topicResource1, topicResource2)
+    PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this, 
topicResource1, topicResource2)
   }
 
-  @Test
-  def testInvalidAlterConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidAlterConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
-    PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(zkClient, servers, 
client)
+    PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client)
   }
 
   @nowarn("cat=deprecation")
-  @Test
-  def testInvalidAlterConfigsDueToPolicy(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidAlterConfigsDueToPolicy(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     // Create topics
@@ -115,7 +131,7 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
 
     val topicConfigEntries3 = Seq(new 
ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava
 
-    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
servers.head.config.brokerId.toString)
+    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
brokers.head.config.brokerId.toString)
     val brokerConfigEntries = Seq(new 
ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava
 
     // Alter configs: second is valid, the others are invalid
@@ -129,10 +145,11 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     assertEquals(Set(topicResource1, topicResource2, topicResource3, 
brokerResource).asJava, alterResult.values.keySet)
     assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
     alterResult.values.get(topicResource2).get
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
     assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
 
     // Verify that the second resource was updated and the others were not
+    ensureConsistentKRaftMetadata()
     var describeResult = client.describeConfigs(Seq(topicResource1, 
topicResource2, topicResource3, brokerResource).asJava)
     var configs = describeResult.all.get
     assertEquals(4, configs.size)
@@ -157,10 +174,11 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     assertEquals(Set(topicResource1, topicResource2, topicResource3, 
brokerResource).asJava, alterResult.values.keySet)
     assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
     alterResult.values.get(topicResource2).get
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
     assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
 
     // Verify that no resources are updated since validate_only = true
+    ensureConsistentKRaftMetadata()
     describeResult = client.describeConfigs(Seq(topicResource1, 
topicResource2, topicResource3, brokerResource).asJava)
     configs = describeResult.all.get
     assertEquals(4, configs.size)
@@ -173,7 +191,6 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     
assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
   }
 
-
 }
 
 object AdminClientWithPoliciesIntegrationTest {
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index d6aa7a7e9a2..543b3b80cdc 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -25,12 +25,13 @@ import java.util.concurrent.atomic.{AtomicBoolean, 
AtomicInteger}
 import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.{Collections, Optional, Properties}
 import java.{time, util}
+
+import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.security.authorizer.AclEntry
-import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
+import kafka.server.{Defaults, DynamicConfig, KafkaConfig}
 import kafka.utils.TestUtils._
 import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
-import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.HostResolver
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
@@ -353,8 +354,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
-  @Test
-  def testDescribeAndAlterConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeAndAlterConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     // Create topics
@@ -370,8 +372,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     createTopic(topic2)
 
     // Describe topics and broker
-    val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, 
servers(1).config.brokerId.toString)
-    val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, 
servers(2).config.brokerId.toString)
+    val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, 
brokers(1).config.brokerId.toString)
+    val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, 
brokers(2).config.brokerId.toString)
     val configResources = Seq(topicResource1, topicResource2, brokerResource1, 
brokerResource2)
     val describeResult = client.describeConfigs(configResources.asJava)
     val configs = describeResult.all.get
@@ -395,10 +397,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertFalse(maxMessageBytes2.isSensitive)
     assertFalse(maxMessageBytes2.isReadOnly)
 
-    assertEquals(servers(1).config.nonInternalValues.size, 
configs.get(brokerResource1).entries.size)
-    assertEquals(servers(1).config.brokerId.toString, 
configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value)
+    assertEquals(brokers(1).config.nonInternalValues.size, 
configs.get(brokerResource1).entries.size)
+    assertEquals(brokers(1).config.brokerId.toString, 
configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value)
     val listenerSecurityProtocolMap = 
configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp)
-    
assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp),
 listenerSecurityProtocolMap.value)
+    
assertEquals(brokers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp),
 listenerSecurityProtocolMap.value)
     assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, 
listenerSecurityProtocolMap.name)
     assertFalse(listenerSecurityProtocolMap.isDefault)
     assertFalse(listenerSecurityProtocolMap.isSensitive)
@@ -410,18 +412,18 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertTrue(truststorePassword.isSensitive)
     assertFalse(truststorePassword.isReadOnly)
     val compressionType = 
configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp)
-    assertEquals(servers(1).config.compressionType, compressionType.value)
+    assertEquals(brokers(1).config.compressionType, compressionType.value)
     assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)
     assertTrue(compressionType.isDefault)
     assertFalse(compressionType.isSensitive)
     assertFalse(compressionType.isReadOnly)
 
-    assertEquals(servers(2).config.nonInternalValues.size, 
configs.get(brokerResource2).entries.size)
-    assertEquals(servers(2).config.brokerId.toString, 
configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
-    assertEquals(servers(2).config.logCleanerThreads.toString,
+    assertEquals(brokers(2).config.nonInternalValues.size, 
configs.get(brokerResource2).entries.size)
+    assertEquals(brokers(2).config.brokerId.toString, 
configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
+    assertEquals(brokers(2).config.logCleanerThreads.toString,
       
configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
 
-    checkValidAlterConfigs(client, topicResource1, topicResource2)
+    checkValidAlterConfigs(client, this, topicResource1, topicResource2)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -968,10 +970,11 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     futures.foreach(_.get)
   }
 
-  @Test
-  def testInvalidAlterConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidAlterConfigs(quorum: String): Unit = {
     client = Admin.create(createConfig)
-    checkInvalidAlterConfigs(zkClient, servers, client)
+    checkInvalidAlterConfigs(this, client)
   }
 
   /**
@@ -1874,7 +1877,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
         Some("Invalid value zip for configuration compression.type"))
     } else {
-      
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidRequestException],
+      
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
         Some("Invalid config value for resource"))
     }
   }
@@ -2078,7 +2081,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     ).asJava)
     assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
 
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidRequestException],
+    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
       Some("Invalid config value for resource"))
   }
 
@@ -2487,7 +2490,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 object PlaintextAdminIntegrationTest {
 
   @nowarn("cat=deprecation")
-  def checkValidAlterConfigs(client: Admin, topicResource1: ConfigResource, 
topicResource2: ConfigResource): Unit = {
+  def checkValidAlterConfigs(
+    admin: Admin,
+    test: KafkaServerTestHarness,
+    topicResource1: ConfigResource,
+    topicResource2: ConfigResource
+  ): Unit = {
     // Alter topics
     var topicConfigEntries1 = Seq(
       new ConfigEntry(LogConfig.FlushMsProp, "1000")
@@ -2498,7 +2506,7 @@ object PlaintextAdminIntegrationTest {
       new ConfigEntry(LogConfig.CompressionTypeProp, "lz4")
     ).asJava
 
-    var alterResult = client.alterConfigs(Map(
+    var alterResult = admin.alterConfigs(Map(
       topicResource1 -> new Config(topicConfigEntries1),
       topicResource2 -> new Config(topicConfigEntries2)
     ).asJava)
@@ -2507,7 +2515,8 @@ object PlaintextAdminIntegrationTest {
     alterResult.all.get
 
     // Verify that topics were updated correctly
-    var describeResult = client.describeConfigs(Seq(topicResource1, 
topicResource2).asJava)
+    test.ensureConsistentKRaftMetadata()
+    var describeResult = admin.describeConfigs(Seq(topicResource1, 
topicResource2).asJava)
     var configs = describeResult.all.get
 
     assertEquals(2, configs.size)
@@ -2530,7 +2539,7 @@ object PlaintextAdminIntegrationTest {
       new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3")
     ).asJava
 
-    alterResult = client.alterConfigs(Map(
+    alterResult = admin.alterConfigs(Map(
       topicResource1 -> new Config(topicConfigEntries1),
       topicResource2 -> new Config(topicConfigEntries2)
     ).asJava, new AlterConfigsOptions().validateOnly(true))
@@ -2539,7 +2548,8 @@ object PlaintextAdminIntegrationTest {
     alterResult.all.get
 
     // Verify that topics were not updated due to validateOnly = true
-    describeResult = client.describeConfigs(Seq(topicResource1, 
topicResource2).asJava)
+    test.ensureConsistentKRaftMetadata()
+    describeResult = admin.describeConfigs(Seq(topicResource1, 
topicResource2).asJava)
     configs = describeResult.all.get
 
     assertEquals(2, configs.size)
@@ -2550,15 +2560,18 @@ object PlaintextAdminIntegrationTest {
   }
 
   @nowarn("cat=deprecation")
-  def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: 
Seq[KafkaServer], client: Admin): Unit = {
+  def checkInvalidAlterConfigs(
+    test: KafkaServerTestHarness,
+    admin: Admin
+  ): Unit = {
     // Create topics
     val topic1 = "invalid-alter-configs-topic-1"
     val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
-    TestUtils.createTopic(zkClient, topic1, 1, 1, servers)
+    createTopicWithAdmin(admin, topic1, test.brokers, numPartitions = 1, 
replicationFactor = 1)
 
     val topic2 = "invalid-alter-configs-topic-2"
     val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
-    TestUtils.createTopic(zkClient, topic2, 1, 1, servers)
+    createTopicWithAdmin(admin, topic2, test.brokers, numPartitions = 1, 
replicationFactor = 1)
 
     val topicConfigEntries1 = Seq(
       new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this 
value is invalid as it's above 1.0
@@ -2567,23 +2580,24 @@ object PlaintextAdminIntegrationTest {
 
     var topicConfigEntries2 = Seq(new 
ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
 
-    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
servers.head.config.brokerId.toString)
+    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
test.brokers.head.config.brokerId.toString)
     val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.ZkConnectProp, 
"localhost:2181")).asJava
 
     // Alter configs: first and third are invalid, second is valid
-    var alterResult = client.alterConfigs(Map(
+    var alterResult = admin.alterConfigs(Map(
       topicResource1 -> new Config(topicConfigEntries1),
       topicResource2 -> new Config(topicConfigEntries2),
       brokerResource -> new Config(brokerConfigEntries)
     ).asJava)
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, 
alterResult.values.keySet)
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException])
     alterResult.values.get(topicResource2).get
     assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
 
     // Verify that first and third resources were not updated and second was 
updated
-    var describeResult = client.describeConfigs(Seq(topicResource1, 
topicResource2, brokerResource).asJava)
+    test.ensureConsistentKRaftMetadata()
+    var describeResult = admin.describeConfigs(Seq(topicResource1, 
topicResource2, brokerResource).asJava)
     var configs = describeResult.all.get
     assertEquals(3, configs.size)
 
@@ -2599,19 +2613,20 @@ object PlaintextAdminIntegrationTest {
     // Alter configs with validateOnly = true: first and third are invalid, 
second is valid
     topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, 
"gzip")).asJava
 
-    alterResult = client.alterConfigs(Map(
+    alterResult = admin.alterConfigs(Map(
       topicResource1 -> new Config(topicConfigEntries1),
       topicResource2 -> new Config(topicConfigEntries2),
       brokerResource -> new Config(brokerConfigEntries)
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, 
alterResult.values.keySet)
-    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+    assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException])
     alterResult.values.get(topicResource2).get
     assertTrue(assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
 
     // Verify that no resources are updated since validate_only = true
-    describeResult = client.describeConfigs(Seq(topicResource1, 
topicResource2, brokerResource).asJava)
+    test.ensureConsistentKRaftMetadata()
+    describeResult = admin.describeConfigs(Seq(topicResource1, topicResource2, 
brokerResource).asJava)
     configs = describeResult.all.get
     assertEquals(3, configs.size)
 
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index dda1ac3347d..f46713337a7 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -352,4 +352,13 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
       )
     }
   }
+
+  def ensureConsistentKRaftMetadata(): Unit = {
+    if (isKRaftTest()) {
+      TestUtils.ensureConsistentKRaftMetadata(
+        brokers,
+        controllerServer
+      )
+    }
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 875b7605b57..84d6f5a2ef9 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -20,7 +20,7 @@ import java.net.InetAddress
 import java.nio.charset.StandardCharsets
 import java.util
 import java.util.Collections.{singletonList, singletonMap}
-import java.util.Properties
+import java.util.{Collections, Properties}
 import java.util.concurrent.ExecutionException
 
 import kafka.integration.KafkaServerTestHarness
@@ -30,7 +30,7 @@ import kafka.server.Constants._
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config, 
ConfigEntry}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.internals.QuotaConfigs
@@ -438,16 +438,28 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testConfigureDefaultTopic(quorum: String): Unit = {
+  def testIncrementalAlterDefaultTopicConfig(quorum: String): Unit = {
     val admin = createAdminClient()
     try {
       val resource = new ConfigResource(ConfigResource.Type.TOPIC, "")
       val op = new AlterConfigOp(new ConfigEntry(FlushMessagesProp, "200000"), 
SET)
-      admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all.get
-      fail("Should fail with InvalidRequestException for topic doesn't exist")
-    } catch {
-      case e: ExecutionException =>
-        assertEquals(classOf[InvalidRequestException], e.getCause().getClass())
+      val future = admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all
+      TestUtils.assertFutureExceptionTypeEquals(future, 
classOf[InvalidRequestException])
+    } finally {
+      admin.close()
+    }
+  }
+
+  @nowarn("cat=deprecation")
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterDefaultTopicConfig(quorum: String): Unit = {
+    val admin = createAdminClient()
+    try {
+      val resource = new ConfigResource(ConfigResource.Type.TOPIC, "")
+      val config = new Config(Collections.singleton(new 
ConfigEntry(FlushMessagesProp, "200000")))
+      val future = admin.alterConfigs(Map(resource -> config).asJava).all
+      TestUtils.assertFutureExceptionTypeEquals(future, 
classOf[InvalidRequestException])
     } finally {
       admin.close()
     }

Reply via email to