This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 77cd827104 MINOR: Move some TopicCommand and ConfigCommand integration 
tests to unit tests (#12024)
77cd827104 is described below

commit 77cd8271047d5d099dde104e912161e0787a0f18
Author: dengziming <[email protected]>
AuthorDate: Fri Apr 15 16:35:52 2022 +0800

    MINOR: Move some TopicCommand and ConfigCommand integration tests to unit 
tests (#12024)
    
    Move some TopicCommand and ConfigCommand integration tests to unit tests to 
speed up the tests
    
    Reviewers: Luke Chen <[email protected]>
---
 .../kafka/admin/ConfigCommandIntegrationTest.scala | 175 ++++++++++++++++++++
 .../kafka/admin/TopicCommandIntegrationTest.scala  |  79 +--------
 .../scala/unit/kafka/admin/ConfigCommandTest.scala | 176 ++++-----------------
 .../scala/unit/kafka/admin/TopicCommandTest.scala  |  80 +++++++++-
 4 files changed, 289 insertions(+), 221 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala
new file mode 100644
index 0000000000..db7ab91729
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.admin.ConfigCommand.ConfigCommandOptions
+import kafka.api.ApiVersion
+import kafka.cluster.{Broker, EndPoint}
+import kafka.server.{ConfigEntityName, KafkaConfig, QuorumTestHarness}
+import kafka.utils.{Exit, Logging}
+import kafka.zk.{AdminZkClient, BrokerInfo}
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
+
+  @Test
+  def shouldExitWithNonZeroStatusOnUpdatingUnallowedConfigViaZk(): Unit = {
+    assertNonZeroStatusExit(Array(
+      "--zookeeper", zkConnect,
+      "--entity-name", "1",
+      "--entity-type", "brokers",
+      "--alter",
+      "--add-config", "security.inter.broker.protocol=PLAINTEXT"))
+  }
+
+  @Test
+  def shouldExitWithNonZeroStatusOnZkCommandAlterUserQuota(): Unit = {
+    assertNonZeroStatusExit(Array(
+      "--zookeeper", zkConnect,
+      "--entity-type", "users",
+      "--entity-name", "admin",
+      "--alter", "--add-config", "consumer_byte_rate=20000"))
+  }
+
+  private def assertNonZeroStatusExit(args: Array[String]): Unit = {
+    var exitStatus: Option[Int] = None
+    Exit.setExitProcedure { (status, _) =>
+      exitStatus = Some(status)
+      throw new RuntimeException
+    }
+
+    try {
+      ConfigCommand.main(args)
+    } catch {
+      case _: RuntimeException =>
+    } finally {
+      Exit.resetExitProcedure()
+    }
+
+    assertEquals(Some(1), exitStatus)
+  }
+
+  @Test
+  def testDynamicBrokerConfigUpdateUsingZooKeeper(): Unit = {
+    val brokerId = "1"
+    val adminZkClient = new AdminZkClient(zkClient)
+    val alterOpts = Array("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter")
+
+    def entityOpt(brokerId: Option[String]): Array[String] = {
+      brokerId.map(id => Array("--entity-name", 
id)).getOrElse(Array("--entity-default"))
+    }
+
+    def alterConfigWithZk(configs: Map[String, String], brokerId: 
Option[String],
+                          encoderConfigs: Map[String, String] = Map.empty): 
Unit = {
+      val configStr = (configs ++ encoderConfigs).map { case (k, v) => 
s"$k=$v" }.mkString(",")
+      val addOpts = new ConfigCommandOptions(alterOpts ++ entityOpt(brokerId) 
++ Array("--add-config", configStr))
+      ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient)
+    }
+
+    def verifyConfig(configs: Map[String, String], brokerId: Option[String]): 
Unit = {
+      val entityConfigs = zkClient.getEntityConfigs("brokers", 
brokerId.getOrElse(ConfigEntityName.Default))
+      assertEquals(configs, entityConfigs.asScala)
+    }
+
+    def alterAndVerifyConfig(configs: Map[String, String], brokerId: 
Option[String]): Unit = {
+      alterConfigWithZk(configs, brokerId)
+      verifyConfig(configs, brokerId)
+    }
+
+    def deleteAndVerifyConfig(configNames: Set[String], brokerId: 
Option[String]): Unit = {
+      val deleteOpts = new ConfigCommandOptions(alterOpts ++ 
entityOpt(brokerId) ++
+        Array("--delete-config", configNames.mkString(",")))
+      ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient)
+      verifyConfig(Map.empty, brokerId)
+    }
+
+    // Add config
+    alterAndVerifyConfig(Map("message.max.size" -> "110000"), Some(brokerId))
+    alterAndVerifyConfig(Map("message.max.size" -> "120000"), None)
+
+    // Change config
+    alterAndVerifyConfig(Map("message.max.size" -> "130000"), Some(brokerId))
+    alterAndVerifyConfig(Map("message.max.size" -> "140000"), None)
+
+    // Delete config
+    deleteAndVerifyConfig(Set("message.max.size"), Some(brokerId))
+    deleteAndVerifyConfig(Set("message.max.size"), None)
+
+    // Listener configs: should work only with listener name
+    alterAndVerifyConfig(Map("listener.name.external.ssl.keystore.location" -> 
"/tmp/test.jks"), Some(brokerId))
+    assertThrows(classOf[ConfigException], () => 
alterConfigWithZk(Map("ssl.keystore.location" -> "/tmp/test.jks"), 
Some(brokerId)))
+
+    // Per-broker config configured at default cluster-level should fail
+    assertThrows(classOf[ConfigException], () => 
alterConfigWithZk(Map("listener.name.external.ssl.keystore.location" -> 
"/tmp/test.jks"), None))
+    deleteAndVerifyConfig(Set("listener.name.external.ssl.keystore.location"), 
Some(brokerId))
+
+    // Password config update without encoder secret should fail
+    assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("listener.name.external.ssl.keystore.password" -> 
"secret"), Some(brokerId)))
+
+    // Password config update with encoder secret should succeed and encoded 
password must be stored in ZK
+    val configs = Map("listener.name.external.ssl.keystore.password" -> 
"secret", "log.cleaner.threads" -> "2")
+    val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp -> 
"encoder-secret")
+    alterConfigWithZk(configs, Some(brokerId), encoderConfigs)
+    val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId)
+    assertFalse(brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp), 
"Encoder secret stored in ZooKeeper")
+    assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not 
encoded
+    val encodedPassword = 
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password")
+    val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs)
+    assertEquals("secret", passwordEncoder.decode(encodedPassword).value)
+    assertEquals(configs.size, brokerConfigs.size)
+
+    // Password config update with overrides for encoder parameters
+    val configs2 = Map("listener.name.internal.ssl.keystore.password" -> 
"secret2")
+    val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp -> 
"encoder-secret",
+      KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding",
+      KafkaConfig.PasswordEncoderIterationsProp -> "1024",
+      KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp -> 
"PBKDF2WithHmacSHA1",
+      KafkaConfig.PasswordEncoderKeyLengthProp -> "64")
+    alterConfigWithZk(configs2, Some(brokerId), encoderConfigs2)
+    val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId)
+    val encodedPassword2 = 
brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password")
+    assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(encoderConfigs).decode(encodedPassword2).value)
+    assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(encoderConfigs2).decode(encodedPassword2).value)
+
+
+    // Password config update at default cluster-level should fail
+    assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, 
None, encoderConfigs))
+
+    // Dynamic config updates using ZK should fail if broker is running.
+    registerBrokerInZk(brokerId.toInt)
+    assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId)))
+    assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "220000"), None))
+
+    // Dynamic config updates using ZK should for a different broker that is 
not running should succeed
+    alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
+  }
+
+  private def registerBrokerInZk(id: Int): Unit = {
+    zkClient.createTopLevelPaths()
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val endpoint = new EndPoint("localhost", 9092, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
+    val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), 
ApiVersion.latestVersion, jmxPort = 9192)
+    zkClient.registerBroker(brokerInfo)
+  }
+}
diff --git 
a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
similarity index 90%
rename from 
core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
rename to 
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
index ee7e64957e..9abf7487d7 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
@@ -16,7 +16,7 @@
   */
 package kafka.admin
 
-import java.util.{Collection, Collections, Optional, Properties}
+import java.util.{Collections, Optional, Properties}
 import kafka.admin.TopicCommand.{TopicCommandOptions, TopicService}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
@@ -24,19 +24,17 @@ import kafka.utils.{Logging, TestInfoUtils, TestUtils}
 import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.{ConfigException, ConfigResource, 
TopicConfig}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
InvalidTopicException, ThrottlingQuotaExceededException, TopicExistsException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
-import org.mockito.ArgumentMatcher
-import org.mockito.ArgumentMatchers.{eq => eqThat, _}
 import org.mockito.Mockito._
 
 import scala.collection.Seq
@@ -812,75 +810,4 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     assertThrows(classOf[InvalidTopicException],
       () => topicService.createTopic(new TopicCommandOptions(Array("--topic", 
"foo.bar"))))
   }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(quorum: 
String): Unit = {
-    val adminClient = mock(classOf[Admin])
-    val topicService = TopicService(adminClient)
-
-    val result = AdminClientTestUtils.createTopicsResult(testTopicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
-    when(adminClient.createTopics(any(), any())).thenReturn(result)
-
-    assertThrows(classOf[ThrottlingQuotaExceededException],
-      () => topicService.createTopic(new TopicCommandOptions(Array("--topic", 
testTopicName))))
-
-    val expectedNewTopic = new NewTopic(testTopicName, 
Optional.empty[Integer](), Optional.empty[java.lang.Short]())
-      .configs(Map.empty[String, String].asJava)
-
-    verify(adminClient, times(1)).createTopics(
-      eqThat(Set(expectedNewTopic).asJava),
-      argThat((_.shouldRetryOnQuotaViolation() == false): 
ArgumentMatcher[CreateTopicsOptions])
-    )
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(quorum: 
String): Unit = {
-    val adminClient = mock(classOf[Admin])
-    val topicService = TopicService(adminClient)
-
-    val listResult = AdminClientTestUtils.listTopicsResult(testTopicName)
-    when(adminClient.listTopics(any())).thenReturn(listResult)
-
-    val result = AdminClientTestUtils.deleteTopicsResult(testTopicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
-    when(adminClient.deleteTopics(any[Collection[String]](), 
any())).thenReturn(result)
-
-    val exception = assertThrows(classOf[ExecutionException],
-      () => topicService.deleteTopic(new TopicCommandOptions(Array("--topic", 
testTopicName))))
-    
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
-
-    verify(adminClient, times(1)).deleteTopics(
-      eqThat(Seq(testTopicName).asJavaCollection),
-      argThat((_.shouldRetryOnQuotaViolation() == false): 
ArgumentMatcher[DeleteTopicsOptions])
-    )
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(quorum: 
String): Unit = {
-    val adminClient = mock(classOf[Admin])
-    val topicService = TopicService(adminClient)
-
-    val listResult = AdminClientTestUtils.listTopicsResult(testTopicName)
-    when(adminClient.listTopics(any())).thenReturn(listResult)
-
-    val topicPartitionInfo = new TopicPartitionInfo(0, new Node(0, "", 0),
-      Collections.emptyList(), Collections.emptyList())
-    val describeResult = 
AdminClientTestUtils.describeTopicsResult(testTopicName, new TopicDescription(
-      testTopicName, false, Collections.singletonList(topicPartitionInfo)))
-    
when(adminClient.describeTopics(any(classOf[java.util.Collection[String]]))).thenReturn(describeResult)
-
-    val result = AdminClientTestUtils.createPartitionsResult(testTopicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
-    when(adminClient.createPartitions(any(), any())).thenReturn(result)
-
-    val exception = assertThrows(classOf[ExecutionException],
-      () => topicService.alterTopic(new TopicCommandOptions(Array("--topic", 
testTopicName, "--partitions", "3"))))
-    
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
-
-    verify(adminClient, times(1)).createPartitions(
-      argThat((_.get(testTopicName).totalCount() == 3): 
ArgumentMatcher[java.util.Map[String, NewPartitions]]),
-      argThat((_.shouldRetryOnQuotaViolation() == false): 
ArgumentMatcher[CreatePartitionsOptions])
-    )
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index c7ccf822b9..8516ba3a24 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -19,46 +19,37 @@ package kafka.admin
 import java.util
 import java.util.Properties
 import kafka.admin.ConfigCommand.ConfigCommandOptions
-import kafka.api.ApiVersion
-import kafka.cluster.{Broker, EndPoint}
-import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig, 
QuorumTestHarness}
+import kafka.cluster.Broker
+import kafka.server.{ConfigEntityName, ConfigType}
 import kafka.utils.{Exit, Logging}
-import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.Node
-import org.apache.kafka.common.config.{ConfigException, ConfigResource}
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.internals.KafkaFutureImpl
-import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers.anyString
 import org.mockito.Mockito.{mock, times, verify, when}
 
 import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
 
-class ConfigCommandTest extends QuorumTestHarness with Logging {
+class ConfigCommandTest extends Logging {
+
+  private val zkConnect = "localhost:2181"
+  private val dummyAdminZkClient = new DummyAdminZkClient(null)
 
   @Test
   def shouldExitWithNonZeroStatusOnArgError(): Unit = {
     assertNonZeroStatusExit(Array("--blah"))
   }
 
-  @Test
-  def shouldExitWithNonZeroStatusOnUpdatingUnallowedConfigViaZk(): Unit = {
-    assertNonZeroStatusExit(Array(
-      "--zookeeper", zkConnect,
-      "--entity-name", "1",
-      "--entity-type", "brokers",
-      "--alter",
-      "--add-config", "security.inter.broker.protocol=PLAINTEXT"))
-  }
-
   @Test
   def shouldExitWithNonZeroStatusOnZkCommandWithTopicsEntity(): Unit = {
     assertNonZeroStatusExit(Array(
@@ -83,15 +74,6 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
       "--describe"))
   }
 
-  @Test
-  def shouldExitWithNonZeroStatusOnZkCommandAlterUserQuota(): Unit = {
-    assertNonZeroStatusExit(Array(
-      "--zookeeper", zkConnect,
-      "--entity-type", "users",
-      "--entity-name", "admin",
-      "--alter", "--add-config", "consumer_byte_rate=20000"))
-  }
-
   @Test
   def shouldExitWithNonZeroStatusAlterUserQuotaWithoutEntityName(): Unit = {
     assertNonZeroStatusExit(Array(
@@ -100,7 +82,6 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
       "--alter", "--add-config", "consumer_byte_rate=20000"))
   }
 
-
   @Test
   def shouldExitWithNonZeroStatusOnBrokerCommandError(): Unit = {
     assertNonZeroStatusExit(Array(
@@ -391,7 +372,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
   def shouldFailIfUnrecognisedEntityTypeUsingZookeeper(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "client", "--entity-type", "not-recognised", "--alter", 
"--add-config", "a=b,c=d"))
-    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, new 
DummyAdminZkClient(zkClient)))
+    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
   }
 
   @Test
@@ -405,7 +386,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
   def shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "A", "--entity-type", "brokers", "--alter", 
"--add-config", "a=b,c=d"))
-    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, new 
DummyAdminZkClient(zkClient)))
+    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
   }
 
   @Test
@@ -419,7 +400,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
   def shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--broker", "A", "--alter", "--add-config", "a=b,c=d"))
-    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, new 
DummyAdminZkClient(zkClient)))
+    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
   }
 
   @Test
@@ -479,6 +460,9 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
+    val zkClient = mock(classOf[KafkaZkClient])
+    when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new 
Properties())
+
     class TestAdminZkClient(zkClient: KafkaZkClient) extends 
AdminZkClient(zkClient) {
       override def changeClientIdConfig(clientId: String, configChange: 
Properties): Unit = {
         assertEquals("my-client-id", clientId)
@@ -498,6 +482,9 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
+    val zkClient = mock(classOf[KafkaZkClient])
+    when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new 
Properties())
+
     class TestAdminZkClient(zkClient: KafkaZkClient) extends 
AdminZkClient(zkClient) {
       override def changeIpConfig(ip: String, configChange: Properties): Unit 
= {
         assertEquals("1.2.3.4", ip)
@@ -774,6 +761,9 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
+    val zkClient = mock(classOf[KafkaZkClient])
+    when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new 
Properties())
+
     class TestAdminZkClient(zkClient: KafkaZkClient) extends 
AdminZkClient(zkClient) {
       override def changeTopicConfig(topic: String, configChange: Properties): 
Unit = {
         assertEquals("my-topic", topic)
@@ -909,7 +899,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
     when(mockZkClient.getBroker(1)).thenReturn(Option(mockBroker))
 
     assertThrows(classOf[IllegalArgumentException],
-      () => ConfigCommand.alterConfigWithZk(mockZkClient, alterOpts, new 
DummyAdminZkClient(zkClient)))
+      () => ConfigCommand.alterConfigWithZk(mockZkClient, alterOpts, 
dummyAdminZkClient))
   }
 
   @Test
@@ -924,7 +914,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
     when(mockZkClient.getBroker(1)).thenReturn(Option(mockBroker))
 
     assertThrows(classOf[IllegalArgumentException],
-      () => ConfigCommand.describeConfigWithZk(mockZkClient, describeOpts, new 
DummyAdminZkClient(zkClient)))
+      () => ConfigCommand.describeConfigWithZk(mockZkClient, describeOpts, 
dummyAdminZkClient))
   }
 
   @Test
@@ -946,7 +936,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
     val mockZkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
     when(mockZkClient.getBroker(1)).thenReturn(None)
 
-    ConfigCommand.describeConfigWithZk(mockZkClient, describeOpts, new 
TestAdminZkClient(zkClient))
+    ConfigCommand.describeConfigWithZk(mockZkClient, describeOpts, new 
TestAdminZkClient(null))
   }
 
   @Test
@@ -1197,6 +1187,9 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
       "--alter",
       "--add-config", "a=b,c=[d,e ,f],g=[h,i]"))
 
+    val zkClient = mock(classOf[KafkaZkClient])
+    when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new 
Properties())
+
     class TestAdminZkClient(zkClient: KafkaZkClient) extends 
AdminZkClient(zkClient) {
       override def changeTopicConfig(topic: String, configChange: Properties): 
Unit = {
         assertEquals("my-topic", topic)
@@ -1216,7 +1209,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
       "--entity-type", "brokers",
       "--alter",
       "--add-config", "leader.replication.throttled.rate=10"))
-    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, new 
DummyAdminZkClient(zkClient)))
+    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
   }
 
   @Test
@@ -1229,101 +1222,6 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
     assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts))
   }
 
-  @Test
-  def testDynamicBrokerConfigUpdateUsingZooKeeper(): Unit = {
-    val brokerId = "1"
-    val adminZkClient = new AdminZkClient(zkClient)
-    val alterOpts = Array("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter")
-
-    def entityOpt(brokerId: Option[String]): Array[String] = {
-      brokerId.map(id => Array("--entity-name", 
id)).getOrElse(Array("--entity-default"))
-    }
-
-    def alterConfigWithZk(configs: Map[String, String], brokerId: 
Option[String],
-                          encoderConfigs: Map[String, String] = Map.empty): 
Unit = {
-      val configStr = (configs ++ encoderConfigs).map { case (k, v) => 
s"$k=$v" }.mkString(",")
-      val addOpts = new ConfigCommandOptions(alterOpts ++ entityOpt(brokerId) 
++ Array("--add-config", configStr))
-      ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient)
-    }
-
-    def verifyConfig(configs: Map[String, String], brokerId: Option[String]): 
Unit = {
-      val entityConfigs = zkClient.getEntityConfigs("brokers", 
brokerId.getOrElse(ConfigEntityName.Default))
-      assertEquals(configs, entityConfigs.asScala)
-    }
-
-    def alterAndVerifyConfig(configs: Map[String, String], brokerId: 
Option[String]): Unit = {
-      alterConfigWithZk(configs, brokerId)
-      verifyConfig(configs, brokerId)
-    }
-
-    def deleteAndVerifyConfig(configNames: Set[String], brokerId: 
Option[String]): Unit = {
-      val deleteOpts = new ConfigCommandOptions(alterOpts ++ 
entityOpt(brokerId) ++
-        Array("--delete-config", configNames.mkString(",")))
-      ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient)
-      verifyConfig(Map.empty, brokerId)
-    }
-
-    // Add config
-    alterAndVerifyConfig(Map("message.max.size" -> "110000"), Some(brokerId))
-    alterAndVerifyConfig(Map("message.max.size" -> "120000"), None)
-
-    // Change config
-    alterAndVerifyConfig(Map("message.max.size" -> "130000"), Some(brokerId))
-    alterAndVerifyConfig(Map("message.max.size" -> "140000"), None)
-
-    // Delete config
-    deleteAndVerifyConfig(Set("message.max.size"), Some(brokerId))
-    deleteAndVerifyConfig(Set("message.max.size"), None)
-
-    // Listener configs: should work only with listener name
-    alterAndVerifyConfig(Map("listener.name.external.ssl.keystore.location" -> 
"/tmp/test.jks"), Some(brokerId))
-    assertThrows(classOf[ConfigException], () => 
alterConfigWithZk(Map("ssl.keystore.location" -> "/tmp/test.jks"), 
Some(brokerId)))
-
-    // Per-broker config configured at default cluster-level should fail
-    assertThrows(classOf[ConfigException], () => 
alterConfigWithZk(Map("listener.name.external.ssl.keystore.location" -> 
"/tmp/test.jks"), None))
-    deleteAndVerifyConfig(Set("listener.name.external.ssl.keystore.location"), 
Some(brokerId))
-
-    // Password config update without encoder secret should fail
-    assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("listener.name.external.ssl.keystore.password" -> 
"secret"), Some(brokerId)))
-
-    // Password config update with encoder secret should succeed and encoded 
password must be stored in ZK
-    val configs = Map("listener.name.external.ssl.keystore.password" -> 
"secret", "log.cleaner.threads" -> "2")
-    val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp -> 
"encoder-secret")
-    alterConfigWithZk(configs, Some(brokerId), encoderConfigs)
-    val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId)
-    assertFalse(brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp), 
"Encoder secret stored in ZooKeeper")
-    assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not 
encoded
-    val encodedPassword = 
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password")
-    val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs)
-    assertEquals("secret", passwordEncoder.decode(encodedPassword).value)
-    assertEquals(configs.size, brokerConfigs.size)
-
-    // Password config update with overrides for encoder parameters
-    val configs2 = Map("listener.name.internal.ssl.keystore.password" -> 
"secret2")
-    val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp -> 
"encoder-secret",
-      KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding",
-      KafkaConfig.PasswordEncoderIterationsProp -> "1024",
-      KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp -> 
"PBKDF2WithHmacSHA1",
-      KafkaConfig.PasswordEncoderKeyLengthProp -> "64")
-    alterConfigWithZk(configs2, Some(brokerId), encoderConfigs2)
-    val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId)
-    val encodedPassword2 = 
brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password")
-    assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(encoderConfigs).decode(encodedPassword2).value)
-    assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(encoderConfigs2).decode(encodedPassword2).value)
-
-
-    // Password config update at default cluster-level should fail
-    assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, 
None, encoderConfigs))
-
-    // Dynamic config updates using ZK should fail if broker is running.
-    registerBrokerInZk(brokerId.toInt)
-    assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId)))
-    assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "220000"), None))
-
-    // Dynamic config updates using ZK should for a different broker that is 
not running should succeed
-    alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
-  }
-
   @Test
   def shouldNotUpdateBrokerConfigIfMalformedConfigUsingZookeeper(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
@@ -1331,7 +1229,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
       "--entity-type", "brokers",
       "--alter",
       "--add-config", "a=="))
-    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, new 
DummyAdminZkClient(zkClient)))
+    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
   }
 
   @Test
@@ -1351,7 +1249,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
       "--entity-type", "brokers",
       "--alter",
       "--add-config", "a=[b,c,d=e"))
-    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, new 
DummyAdminZkClient(zkClient)))
+    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
   }
 
   @Test
@@ -1371,7 +1269,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
       "--entity-type", "topics",
       "--alter",
       "--delete-config", "missing_config1, missing_config2"))
-    assertThrows(classOf[InvalidConfigurationException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, new 
DummyAdminZkClient(zkClient)))
+    assertThrows(classOf[InvalidConfigurationException], () => 
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
   }
 
   @Test
@@ -1432,7 +1330,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
     val mockBroker: Broker = mock(classOf[Broker])
     when(mockZkClient.getBroker(1)).thenReturn(Option(mockBroker))
 
-    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(mockZkClient, createOpts, new 
TestAdminZkClient(zkClient)))
+    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(mockZkClient, createOpts, new 
TestAdminZkClient(null)))
   }
 
   @Test
@@ -1452,7 +1350,7 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
         "--delete-config", mechanism))
 
     val credentials = mutable.Map[String, Properties]()
-    case class CredentialChange(user: String, mechanisms: Set[String], 
iterations: Int) extends AdminZkClient(zkClient) {
+    case class CredentialChange(user: String, mechanisms: Set[String], 
iterations: Int) extends AdminZkClient(null) {
       override def fetchEntityConfig(entityType: String, entityName: String): 
Properties = {
         credentials.getOrElse(entityName, new Properties())
       }
@@ -1679,14 +1577,6 @@ class ConfigCommandTest extends QuorumTestHarness with 
Logging {
         Seq("<default>/clients/client-3", sanitizedPrincipal + 
"/clients/client-2"))
   }
 
-  private def registerBrokerInZk(id: Int): Unit = {
-    zkClient.createTopLevelPaths()
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val endpoint = new EndPoint("localhost", 9092, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
-    val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), 
ApiVersion.latestVersion, jmxPort = 9192)
-    zkClient.registerBroker(brokerInfo)
-  }
-
   class DummyAdminZkClient(zkClient: KafkaZkClient) extends 
AdminZkClient(zkClient) {
     override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): 
Unit = {}
     override def fetchEntityConfig(entityType: String, entityName: String): 
Properties = {new Properties}
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 9586cf5395..dcbf2cff13 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -16,15 +16,23 @@
  */
 package kafka.admin
 
-import kafka.admin.TopicCommand.{PartitionDescription, TopicCommandOptions}
+import kafka.admin.TopicCommand.{PartitionDescription, TopicCommandOptions, 
TopicService}
 import kafka.common.AdminCommandFailedException
 import kafka.utils.Exit
-import org.apache.kafka.clients.admin.PartitionReassignment
+import org.apache.kafka.clients.admin.{Admin, AdminClientTestUtils, 
CreatePartitionsOptions, CreateTopicsOptions, DeleteTopicsOptions, 
NewPartitions, NewTopic, PartitionReassignment, TopicDescription}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.TopicPartitionInfo
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
+import org.apache.kafka.common.protocol.Errors
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatcher
+import org.mockito.ArgumentMatchers.{any, argThat, eq => eqThat}
+import org.mockito.Mockito.{mock, times, verify, when}
 
+import java.util.{Collection, Collections, Optional}
+import scala.collection.Seq
+import scala.concurrent.ExecutionException
 import scala.jdk.CollectionConverters._
 
 class TopicCommandTest {
@@ -159,6 +167,74 @@ class TopicCommandTest {
     assertEquals(expectedAssignment, actualAssignment)
   }
 
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = mock(classOf[Admin])
+    val topicService = TopicService(adminClient)
+
+    val result = AdminClientTestUtils.createTopicsResult(topicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    when(adminClient.createTopics(any(), any())).thenReturn(result)
+
+    assertThrows(classOf[ThrottlingQuotaExceededException],
+      () => topicService.createTopic(new TopicCommandOptions(Array("--topic", 
topicName))))
+
+    val expectedNewTopic = new NewTopic(topicName, Optional.empty[Integer](), 
Optional.empty[java.lang.Short]())
+      .configs(Map.empty[String, String].asJava)
+
+    verify(adminClient, times(1)).createTopics(
+      eqThat(Set(expectedNewTopic).asJava),
+      argThat((_.shouldRetryOnQuotaViolation() == false): 
ArgumentMatcher[CreateTopicsOptions])
+    )
+  }
+
+  @Test
+  def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = mock(classOf[Admin])
+    val topicService = TopicService(adminClient)
+
+    val listResult = AdminClientTestUtils.listTopicsResult(topicName)
+    when(adminClient.listTopics(any())).thenReturn(listResult)
+
+    val result = AdminClientTestUtils.deleteTopicsResult(topicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    when(adminClient.deleteTopics(any[Collection[String]](), 
any())).thenReturn(result)
+
+    val exception = assertThrows(classOf[ExecutionException],
+      () => topicService.deleteTopic(new TopicCommandOptions(Array("--topic", 
topicName))))
+    
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
+
+    verify(adminClient, times(1)).deleteTopics(
+      eqThat(Seq(topicName).asJavaCollection),
+      argThat((_.shouldRetryOnQuotaViolation() == false): 
ArgumentMatcher[DeleteTopicsOptions])
+    )
+  }
+
+  @Test
+  def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(): Unit 
= {
+    val adminClient = mock(classOf[Admin])
+    val topicService = TopicService(adminClient)
+
+    val listResult = AdminClientTestUtils.listTopicsResult(topicName)
+    when(adminClient.listTopics(any())).thenReturn(listResult)
+
+    val topicPartitionInfo = new TopicPartitionInfo(0, new Node(0, "", 0),
+      Collections.emptyList(), Collections.emptyList())
+    val describeResult = AdminClientTestUtils.describeTopicsResult(topicName, 
new TopicDescription(
+      topicName, false, Collections.singletonList(topicPartitionInfo)))
+    
when(adminClient.describeTopics(any(classOf[java.util.Collection[String]]))).thenReturn(describeResult)
+
+    val result = AdminClientTestUtils.createPartitionsResult(topicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    when(adminClient.createPartitions(any(), any())).thenReturn(result)
+
+    val exception = assertThrows(classOf[ExecutionException],
+      () => topicService.alterTopic(new TopicCommandOptions(Array("--topic", 
topicName, "--partitions", "3"))))
+    
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
+
+    verify(adminClient, times(1)).createPartitions(
+      argThat((_.get(topicName).totalCount() == 3): 
ArgumentMatcher[java.util.Map[String, NewPartitions]]),
+      argThat((_.shouldRetryOnQuotaViolation() == false): 
ArgumentMatcher[CreatePartitionsOptions])
+    )
+  }
+
   private[this] def assertCheckArgsExitCode(expected: Int, options: 
TopicCommandOptions): Unit = {
     Exit.setExitProcedure {
       (exitCode: Int, _: Option[String]) =>

Reply via email to