This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 77cd827104 MINOR: Move some TopicCommand and ConfigCommand integration
tests to unit tests (#12024)
77cd827104 is described below
commit 77cd8271047d5d099dde104e912161e0787a0f18
Author: dengziming <[email protected]>
AuthorDate: Fri Apr 15 16:35:52 2022 +0800
MINOR: Move some TopicCommand and ConfigCommand integration tests to unit
tests (#12024)
Move some TopicCommand and ConfigCommand integration tests to unit tests to
speed up the tests
Reviewers: Luke Chen <[email protected]>
---
.../kafka/admin/ConfigCommandIntegrationTest.scala | 175 ++++++++++++++++++++
.../kafka/admin/TopicCommandIntegrationTest.scala | 79 +--------
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 176 ++++-----------------
.../scala/unit/kafka/admin/TopicCommandTest.scala | 80 +++++++++-
4 files changed, 289 insertions(+), 221 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala
new file mode 100644
index 0000000000..db7ab91729
--- /dev/null
+++
b/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.admin.ConfigCommand.ConfigCommandOptions
+import kafka.api.ApiVersion
+import kafka.cluster.{Broker, EndPoint}
+import kafka.server.{ConfigEntityName, KafkaConfig, QuorumTestHarness}
+import kafka.utils.{Exit, Logging}
+import kafka.zk.{AdminZkClient, BrokerInfo}
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
+
+ @Test
+ def shouldExitWithNonZeroStatusOnUpdatingUnallowedConfigViaZk(): Unit = {
+ assertNonZeroStatusExit(Array(
+ "--zookeeper", zkConnect,
+ "--entity-name", "1",
+ "--entity-type", "brokers",
+ "--alter",
+ "--add-config", "security.inter.broker.protocol=PLAINTEXT"))
+ }
+
+ @Test
+ def shouldExitWithNonZeroStatusOnZkCommandAlterUserQuota(): Unit = {
+ assertNonZeroStatusExit(Array(
+ "--zookeeper", zkConnect,
+ "--entity-type", "users",
+ "--entity-name", "admin",
+ "--alter", "--add-config", "consumer_byte_rate=20000"))
+ }
+
+ private def assertNonZeroStatusExit(args: Array[String]): Unit = {
+ var exitStatus: Option[Int] = None
+ Exit.setExitProcedure { (status, _) =>
+ exitStatus = Some(status)
+ throw new RuntimeException
+ }
+
+ try {
+ ConfigCommand.main(args)
+ } catch {
+ case _: RuntimeException =>
+ } finally {
+ Exit.resetExitProcedure()
+ }
+
+ assertEquals(Some(1), exitStatus)
+ }
+
+ @Test
+ def testDynamicBrokerConfigUpdateUsingZooKeeper(): Unit = {
+ val brokerId = "1"
+ val adminZkClient = new AdminZkClient(zkClient)
+ val alterOpts = Array("--zookeeper", zkConnect, "--entity-type",
"brokers", "--alter")
+
+ def entityOpt(brokerId: Option[String]): Array[String] = {
+ brokerId.map(id => Array("--entity-name",
id)).getOrElse(Array("--entity-default"))
+ }
+
+ def alterConfigWithZk(configs: Map[String, String], brokerId:
Option[String],
+ encoderConfigs: Map[String, String] = Map.empty):
Unit = {
+ val configStr = (configs ++ encoderConfigs).map { case (k, v) =>
s"$k=$v" }.mkString(",")
+ val addOpts = new ConfigCommandOptions(alterOpts ++ entityOpt(brokerId)
++ Array("--add-config", configStr))
+ ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient)
+ }
+
+ def verifyConfig(configs: Map[String, String], brokerId: Option[String]):
Unit = {
+ val entityConfigs = zkClient.getEntityConfigs("brokers",
brokerId.getOrElse(ConfigEntityName.Default))
+ assertEquals(configs, entityConfigs.asScala)
+ }
+
+ def alterAndVerifyConfig(configs: Map[String, String], brokerId:
Option[String]): Unit = {
+ alterConfigWithZk(configs, brokerId)
+ verifyConfig(configs, brokerId)
+ }
+
+ def deleteAndVerifyConfig(configNames: Set[String], brokerId:
Option[String]): Unit = {
+ val deleteOpts = new ConfigCommandOptions(alterOpts ++
entityOpt(brokerId) ++
+ Array("--delete-config", configNames.mkString(",")))
+ ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient)
+ verifyConfig(Map.empty, brokerId)
+ }
+
+ // Add config
+ alterAndVerifyConfig(Map("message.max.size" -> "110000"), Some(brokerId))
+ alterAndVerifyConfig(Map("message.max.size" -> "120000"), None)
+
+ // Change config
+ alterAndVerifyConfig(Map("message.max.size" -> "130000"), Some(brokerId))
+ alterAndVerifyConfig(Map("message.max.size" -> "140000"), None)
+
+ // Delete config
+ deleteAndVerifyConfig(Set("message.max.size"), Some(brokerId))
+ deleteAndVerifyConfig(Set("message.max.size"), None)
+
+ // Listener configs: should work only with listener name
+ alterAndVerifyConfig(Map("listener.name.external.ssl.keystore.location" ->
"/tmp/test.jks"), Some(brokerId))
+ assertThrows(classOf[ConfigException], () =>
alterConfigWithZk(Map("ssl.keystore.location" -> "/tmp/test.jks"),
Some(brokerId)))
+
+ // Per-broker config configured at default cluster-level should fail
+ assertThrows(classOf[ConfigException], () =>
alterConfigWithZk(Map("listener.name.external.ssl.keystore.location" ->
"/tmp/test.jks"), None))
+ deleteAndVerifyConfig(Set("listener.name.external.ssl.keystore.location"),
Some(brokerId))
+
+ // Password config update without encoder secret should fail
+ assertThrows(classOf[IllegalArgumentException], () =>
alterConfigWithZk(Map("listener.name.external.ssl.keystore.password" ->
"secret"), Some(brokerId)))
+
+ // Password config update with encoder secret should succeed and encoded
password must be stored in ZK
+ val configs = Map("listener.name.external.ssl.keystore.password" ->
"secret", "log.cleaner.threads" -> "2")
+ val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp ->
"encoder-secret")
+ alterConfigWithZk(configs, Some(brokerId), encoderConfigs)
+ val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId)
+ assertFalse(brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp),
"Encoder secret stored in ZooKeeper")
+ assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not
encoded
+ val encodedPassword =
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password")
+ val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs)
+ assertEquals("secret", passwordEncoder.decode(encodedPassword).value)
+ assertEquals(configs.size, brokerConfigs.size)
+
+ // Password config update with overrides for encoder parameters
+ val configs2 = Map("listener.name.internal.ssl.keystore.password" ->
"secret2")
+ val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp ->
"encoder-secret",
+ KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding",
+ KafkaConfig.PasswordEncoderIterationsProp -> "1024",
+ KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp ->
"PBKDF2WithHmacSHA1",
+ KafkaConfig.PasswordEncoderKeyLengthProp -> "64")
+ alterConfigWithZk(configs2, Some(brokerId), encoderConfigs2)
+ val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId)
+ val encodedPassword2 =
brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password")
+ assertEquals("secret2",
ConfigCommand.createPasswordEncoder(encoderConfigs).decode(encodedPassword2).value)
+ assertEquals("secret2",
ConfigCommand.createPasswordEncoder(encoderConfigs2).decode(encodedPassword2).value)
+
+
+ // Password config update at default cluster-level should fail
+ assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs,
None, encoderConfigs))
+
+ // Dynamic config updates using ZK should fail if broker is running.
+ registerBrokerInZk(brokerId.toInt)
+ assertThrows(classOf[IllegalArgumentException], () =>
alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId)))
+ assertThrows(classOf[IllegalArgumentException], () =>
alterConfigWithZk(Map("message.max.size" -> "220000"), None))
+
+ // Dynamic config updates using ZK should for a different broker that is
not running should succeed
+ alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
+ }
+
+ private def registerBrokerInZk(id: Int): Unit = {
+ zkClient.createTopLevelPaths()
+ val securityProtocol = SecurityProtocol.PLAINTEXT
+ val endpoint = new EndPoint("localhost", 9092,
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
+ val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None),
ApiVersion.latestVersion, jmxPort = 9192)
+ zkClient.registerBroker(brokerInfo)
+ }
+}
diff --git
a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
similarity index 90%
rename from
core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
rename to
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
index ee7e64957e..9abf7487d7 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.admin
-import java.util.{Collection, Collections, Optional, Properties}
+import java.util.{Collections, Optional, Properties}
import kafka.admin.TopicCommand.{TopicCommandOptions, TopicService}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
@@ -24,19 +24,17 @@ import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin._
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigException, ConfigResource,
TopicConfig}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException,
InvalidTopicException, ThrottlingQuotaExceededException, TopicExistsException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException,
InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
-import org.mockito.ArgumentMatcher
-import org.mockito.ArgumentMatchers.{eq => eqThat, _}
import org.mockito.Mockito._
import scala.collection.Seq
@@ -812,75 +810,4 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertThrows(classOf[InvalidTopicException],
() => topicService.createTopic(new TopicCommandOptions(Array("--topic",
"foo.bar"))))
}
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
- def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(quorum:
String): Unit = {
- val adminClient = mock(classOf[Admin])
- val topicService = TopicService(adminClient)
-
- val result = AdminClientTestUtils.createTopicsResult(testTopicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
- when(adminClient.createTopics(any(), any())).thenReturn(result)
-
- assertThrows(classOf[ThrottlingQuotaExceededException],
- () => topicService.createTopic(new TopicCommandOptions(Array("--topic",
testTopicName))))
-
- val expectedNewTopic = new NewTopic(testTopicName,
Optional.empty[Integer](), Optional.empty[java.lang.Short]())
- .configs(Map.empty[String, String].asJava)
-
- verify(adminClient, times(1)).createTopics(
- eqThat(Set(expectedNewTopic).asJava),
- argThat((_.shouldRetryOnQuotaViolation() == false):
ArgumentMatcher[CreateTopicsOptions])
- )
- }
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
- def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(quorum:
String): Unit = {
- val adminClient = mock(classOf[Admin])
- val topicService = TopicService(adminClient)
-
- val listResult = AdminClientTestUtils.listTopicsResult(testTopicName)
- when(adminClient.listTopics(any())).thenReturn(listResult)
-
- val result = AdminClientTestUtils.deleteTopicsResult(testTopicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
- when(adminClient.deleteTopics(any[Collection[String]](),
any())).thenReturn(result)
-
- val exception = assertThrows(classOf[ExecutionException],
- () => topicService.deleteTopic(new TopicCommandOptions(Array("--topic",
testTopicName))))
-
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
-
- verify(adminClient, times(1)).deleteTopics(
- eqThat(Seq(testTopicName).asJavaCollection),
- argThat((_.shouldRetryOnQuotaViolation() == false):
ArgumentMatcher[DeleteTopicsOptions])
- )
- }
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
- @ValueSource(strings = Array("zk", "kraft"))
- def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(quorum:
String): Unit = {
- val adminClient = mock(classOf[Admin])
- val topicService = TopicService(adminClient)
-
- val listResult = AdminClientTestUtils.listTopicsResult(testTopicName)
- when(adminClient.listTopics(any())).thenReturn(listResult)
-
- val topicPartitionInfo = new TopicPartitionInfo(0, new Node(0, "", 0),
- Collections.emptyList(), Collections.emptyList())
- val describeResult =
AdminClientTestUtils.describeTopicsResult(testTopicName, new TopicDescription(
- testTopicName, false, Collections.singletonList(topicPartitionInfo)))
-
when(adminClient.describeTopics(any(classOf[java.util.Collection[String]]))).thenReturn(describeResult)
-
- val result = AdminClientTestUtils.createPartitionsResult(testTopicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
- when(adminClient.createPartitions(any(), any())).thenReturn(result)
-
- val exception = assertThrows(classOf[ExecutionException],
- () => topicService.alterTopic(new TopicCommandOptions(Array("--topic",
testTopicName, "--partitions", "3"))))
-
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
-
- verify(adminClient, times(1)).createPartitions(
- argThat((_.get(testTopicName).totalCount() == 3):
ArgumentMatcher[java.util.Map[String, NewPartitions]]),
- argThat((_.shouldRetryOnQuotaViolation() == false):
ArgumentMatcher[CreatePartitionsOptions])
- )
- }
}
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index c7ccf822b9..8516ba3a24 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -19,46 +19,37 @@ package kafka.admin
import java.util
import java.util.Properties
import kafka.admin.ConfigCommand.ConfigCommandOptions
-import kafka.api.ApiVersion
-import kafka.cluster.{Broker, EndPoint}
-import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig,
QuorumTestHarness}
+import kafka.cluster.Broker
+import kafka.server.{ConfigEntityName, ConfigType}
import kafka.utils.{Exit, Logging}
-import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.Node
-import org.apache.kafka.common.config.{ConfigException, ConfigResource}
+import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.internals.KafkaFutureImpl
-import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mockito.{mock, times, verify, when}
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
-class ConfigCommandTest extends QuorumTestHarness with Logging {
+class ConfigCommandTest extends Logging {
+
+ private val zkConnect = "localhost:2181"
+ private val dummyAdminZkClient = new DummyAdminZkClient(null)
@Test
def shouldExitWithNonZeroStatusOnArgError(): Unit = {
assertNonZeroStatusExit(Array("--blah"))
}
- @Test
- def shouldExitWithNonZeroStatusOnUpdatingUnallowedConfigViaZk(): Unit = {
- assertNonZeroStatusExit(Array(
- "--zookeeper", zkConnect,
- "--entity-name", "1",
- "--entity-type", "brokers",
- "--alter",
- "--add-config", "security.inter.broker.protocol=PLAINTEXT"))
- }
-
@Test
def shouldExitWithNonZeroStatusOnZkCommandWithTopicsEntity(): Unit = {
assertNonZeroStatusExit(Array(
@@ -83,15 +74,6 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--describe"))
}
- @Test
- def shouldExitWithNonZeroStatusOnZkCommandAlterUserQuota(): Unit = {
- assertNonZeroStatusExit(Array(
- "--zookeeper", zkConnect,
- "--entity-type", "users",
- "--entity-name", "admin",
- "--alter", "--add-config", "consumer_byte_rate=20000"))
- }
-
@Test
def shouldExitWithNonZeroStatusAlterUserQuotaWithoutEntityName(): Unit = {
assertNonZeroStatusExit(Array(
@@ -100,7 +82,6 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--alter", "--add-config", "consumer_byte_rate=20000"))
}
-
@Test
def shouldExitWithNonZeroStatusOnBrokerCommandError(): Unit = {
assertNonZeroStatusExit(Array(
@@ -391,7 +372,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
def shouldFailIfUnrecognisedEntityTypeUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "client", "--entity-type", "not-recognised", "--alter",
"--add-config", "a=b,c=d"))
- assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, new
DummyAdminZkClient(zkClient)))
+ assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
}
@Test
@@ -405,7 +386,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
def shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "A", "--entity-type", "brokers", "--alter",
"--add-config", "a=b,c=d"))
- assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, new
DummyAdminZkClient(zkClient)))
+ assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
}
@Test
@@ -419,7 +400,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
def shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--broker", "A", "--alter", "--add-config", "a=b,c=d"))
- assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, new
DummyAdminZkClient(zkClient)))
+ assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
}
@Test
@@ -479,6 +460,9 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--alter",
"--add-config", "a=b,c=d"))
+ val zkClient = mock(classOf[KafkaZkClient])
+ when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new
Properties())
+
class TestAdminZkClient(zkClient: KafkaZkClient) extends
AdminZkClient(zkClient) {
override def changeClientIdConfig(clientId: String, configChange:
Properties): Unit = {
assertEquals("my-client-id", clientId)
@@ -498,6 +482,9 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--alter",
"--add-config", "a=b,c=d"))
+ val zkClient = mock(classOf[KafkaZkClient])
+ when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new
Properties())
+
class TestAdminZkClient(zkClient: KafkaZkClient) extends
AdminZkClient(zkClient) {
override def changeIpConfig(ip: String, configChange: Properties): Unit
= {
assertEquals("1.2.3.4", ip)
@@ -774,6 +761,9 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--alter",
"--add-config", "a=b,c=d"))
+ val zkClient = mock(classOf[KafkaZkClient])
+ when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new
Properties())
+
class TestAdminZkClient(zkClient: KafkaZkClient) extends
AdminZkClient(zkClient) {
override def changeTopicConfig(topic: String, configChange: Properties):
Unit = {
assertEquals("my-topic", topic)
@@ -909,7 +899,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
when(mockZkClient.getBroker(1)).thenReturn(Option(mockBroker))
assertThrows(classOf[IllegalArgumentException],
- () => ConfigCommand.alterConfigWithZk(mockZkClient, alterOpts, new
DummyAdminZkClient(zkClient)))
+ () => ConfigCommand.alterConfigWithZk(mockZkClient, alterOpts,
dummyAdminZkClient))
}
@Test
@@ -924,7 +914,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
when(mockZkClient.getBroker(1)).thenReturn(Option(mockBroker))
assertThrows(classOf[IllegalArgumentException],
- () => ConfigCommand.describeConfigWithZk(mockZkClient, describeOpts, new
DummyAdminZkClient(zkClient)))
+ () => ConfigCommand.describeConfigWithZk(mockZkClient, describeOpts,
dummyAdminZkClient))
}
@Test
@@ -946,7 +936,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
val mockZkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
when(mockZkClient.getBroker(1)).thenReturn(None)
- ConfigCommand.describeConfigWithZk(mockZkClient, describeOpts, new
TestAdminZkClient(zkClient))
+ ConfigCommand.describeConfigWithZk(mockZkClient, describeOpts, new
TestAdminZkClient(null))
}
@Test
@@ -1197,6 +1187,9 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--alter",
"--add-config", "a=b,c=[d,e ,f],g=[h,i]"))
+ val zkClient = mock(classOf[KafkaZkClient])
+ when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new
Properties())
+
class TestAdminZkClient(zkClient: KafkaZkClient) extends
AdminZkClient(zkClient) {
override def changeTopicConfig(topic: String, configChange: Properties):
Unit = {
assertEquals("my-topic", topic)
@@ -1216,7 +1209,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--entity-type", "brokers",
"--alter",
"--add-config", "leader.replication.throttled.rate=10"))
- assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, new
DummyAdminZkClient(zkClient)))
+ assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
}
@Test
@@ -1229,101 +1222,6 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)),
createOpts))
}
- @Test
- def testDynamicBrokerConfigUpdateUsingZooKeeper(): Unit = {
- val brokerId = "1"
- val adminZkClient = new AdminZkClient(zkClient)
- val alterOpts = Array("--zookeeper", zkConnect, "--entity-type",
"brokers", "--alter")
-
- def entityOpt(brokerId: Option[String]): Array[String] = {
- brokerId.map(id => Array("--entity-name",
id)).getOrElse(Array("--entity-default"))
- }
-
- def alterConfigWithZk(configs: Map[String, String], brokerId:
Option[String],
- encoderConfigs: Map[String, String] = Map.empty):
Unit = {
- val configStr = (configs ++ encoderConfigs).map { case (k, v) =>
s"$k=$v" }.mkString(",")
- val addOpts = new ConfigCommandOptions(alterOpts ++ entityOpt(brokerId)
++ Array("--add-config", configStr))
- ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient)
- }
-
- def verifyConfig(configs: Map[String, String], brokerId: Option[String]):
Unit = {
- val entityConfigs = zkClient.getEntityConfigs("brokers",
brokerId.getOrElse(ConfigEntityName.Default))
- assertEquals(configs, entityConfigs.asScala)
- }
-
- def alterAndVerifyConfig(configs: Map[String, String], brokerId:
Option[String]): Unit = {
- alterConfigWithZk(configs, brokerId)
- verifyConfig(configs, brokerId)
- }
-
- def deleteAndVerifyConfig(configNames: Set[String], brokerId:
Option[String]): Unit = {
- val deleteOpts = new ConfigCommandOptions(alterOpts ++
entityOpt(brokerId) ++
- Array("--delete-config", configNames.mkString(",")))
- ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient)
- verifyConfig(Map.empty, brokerId)
- }
-
- // Add config
- alterAndVerifyConfig(Map("message.max.size" -> "110000"), Some(brokerId))
- alterAndVerifyConfig(Map("message.max.size" -> "120000"), None)
-
- // Change config
- alterAndVerifyConfig(Map("message.max.size" -> "130000"), Some(brokerId))
- alterAndVerifyConfig(Map("message.max.size" -> "140000"), None)
-
- // Delete config
- deleteAndVerifyConfig(Set("message.max.size"), Some(brokerId))
- deleteAndVerifyConfig(Set("message.max.size"), None)
-
- // Listener configs: should work only with listener name
- alterAndVerifyConfig(Map("listener.name.external.ssl.keystore.location" ->
"/tmp/test.jks"), Some(brokerId))
- assertThrows(classOf[ConfigException], () =>
alterConfigWithZk(Map("ssl.keystore.location" -> "/tmp/test.jks"),
Some(brokerId)))
-
- // Per-broker config configured at default cluster-level should fail
- assertThrows(classOf[ConfigException], () =>
alterConfigWithZk(Map("listener.name.external.ssl.keystore.location" ->
"/tmp/test.jks"), None))
- deleteAndVerifyConfig(Set("listener.name.external.ssl.keystore.location"),
Some(brokerId))
-
- // Password config update without encoder secret should fail
- assertThrows(classOf[IllegalArgumentException], () =>
alterConfigWithZk(Map("listener.name.external.ssl.keystore.password" ->
"secret"), Some(brokerId)))
-
- // Password config update with encoder secret should succeed and encoded
password must be stored in ZK
- val configs = Map("listener.name.external.ssl.keystore.password" ->
"secret", "log.cleaner.threads" -> "2")
- val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp ->
"encoder-secret")
- alterConfigWithZk(configs, Some(brokerId), encoderConfigs)
- val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId)
- assertFalse(brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp),
"Encoder secret stored in ZooKeeper")
- assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not
encoded
- val encodedPassword =
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password")
- val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs)
- assertEquals("secret", passwordEncoder.decode(encodedPassword).value)
- assertEquals(configs.size, brokerConfigs.size)
-
- // Password config update with overrides for encoder parameters
- val configs2 = Map("listener.name.internal.ssl.keystore.password" ->
"secret2")
- val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp ->
"encoder-secret",
- KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding",
- KafkaConfig.PasswordEncoderIterationsProp -> "1024",
- KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp ->
"PBKDF2WithHmacSHA1",
- KafkaConfig.PasswordEncoderKeyLengthProp -> "64")
- alterConfigWithZk(configs2, Some(brokerId), encoderConfigs2)
- val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId)
- val encodedPassword2 =
brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password")
- assertEquals("secret2",
ConfigCommand.createPasswordEncoder(encoderConfigs).decode(encodedPassword2).value)
- assertEquals("secret2",
ConfigCommand.createPasswordEncoder(encoderConfigs2).decode(encodedPassword2).value)
-
-
- // Password config update at default cluster-level should fail
- assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs,
None, encoderConfigs))
-
- // Dynamic config updates using ZK should fail if broker is running.
- registerBrokerInZk(brokerId.toInt)
- assertThrows(classOf[IllegalArgumentException], () =>
alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId)))
- assertThrows(classOf[IllegalArgumentException], () =>
alterConfigWithZk(Map("message.max.size" -> "220000"), None))
-
- // Dynamic config updates using ZK should for a different broker that is
not running should succeed
- alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
- }
-
@Test
def shouldNotUpdateBrokerConfigIfMalformedConfigUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
@@ -1331,7 +1229,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--entity-type", "brokers",
"--alter",
"--add-config", "a=="))
- assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, new
DummyAdminZkClient(zkClient)))
+ assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
}
@Test
@@ -1351,7 +1249,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--entity-type", "brokers",
"--alter",
"--add-config", "a=[b,c,d=e"))
- assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, new
DummyAdminZkClient(zkClient)))
+ assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
}
@Test
@@ -1371,7 +1269,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--entity-type", "topics",
"--alter",
"--delete-config", "missing_config1, missing_config2"))
- assertThrows(classOf[InvalidConfigurationException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, new
DummyAdminZkClient(zkClient)))
+ assertThrows(classOf[InvalidConfigurationException], () =>
ConfigCommand.alterConfigWithZk(null, createOpts, dummyAdminZkClient))
}
@Test
@@ -1432,7 +1330,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
val mockBroker: Broker = mock(classOf[Broker])
when(mockZkClient.getBroker(1)).thenReturn(Option(mockBroker))
- assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(mockZkClient, createOpts, new
TestAdminZkClient(zkClient)))
+ assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(mockZkClient, createOpts, new
TestAdminZkClient(null)))
}
@Test
@@ -1452,7 +1350,7 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
"--delete-config", mechanism))
val credentials = mutable.Map[String, Properties]()
- case class CredentialChange(user: String, mechanisms: Set[String],
iterations: Int) extends AdminZkClient(zkClient) {
+ case class CredentialChange(user: String, mechanisms: Set[String],
iterations: Int) extends AdminZkClient(null) {
override def fetchEntityConfig(entityType: String, entityName: String):
Properties = {
credentials.getOrElse(entityName, new Properties())
}
@@ -1679,14 +1577,6 @@ class ConfigCommandTest extends QuorumTestHarness with
Logging {
Seq("<default>/clients/client-3", sanitizedPrincipal +
"/clients/client-2"))
}
- private def registerBrokerInZk(id: Int): Unit = {
- zkClient.createTopLevelPaths()
- val securityProtocol = SecurityProtocol.PLAINTEXT
- val endpoint = new EndPoint("localhost", 9092,
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
- val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None),
ApiVersion.latestVersion, jmxPort = 9192)
- zkClient.registerBroker(brokerInfo)
- }
-
class DummyAdminZkClient(zkClient: KafkaZkClient) extends
AdminZkClient(zkClient) {
override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties):
Unit = {}
override def fetchEntityConfig(entityType: String, entityName: String):
Properties = {new Properties}
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 9586cf5395..dcbf2cff13 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -16,15 +16,23 @@
*/
package kafka.admin
-import kafka.admin.TopicCommand.{PartitionDescription, TopicCommandOptions}
+import kafka.admin.TopicCommand.{PartitionDescription, TopicCommandOptions,
TopicService}
import kafka.common.AdminCommandFailedException
import kafka.utils.Exit
-import org.apache.kafka.clients.admin.PartitionReassignment
+import org.apache.kafka.clients.admin.{Admin, AdminClientTestUtils,
CreatePartitionsOptions, CreateTopicsOptions, DeleteTopicsOptions,
NewPartitions, NewTopic, PartitionReassignment, TopicDescription}
import org.apache.kafka.common.Node
import org.apache.kafka.common.TopicPartitionInfo
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
+import org.apache.kafka.common.protocol.Errors
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatcher
+import org.mockito.ArgumentMatchers.{any, argThat, eq => eqThat}
+import org.mockito.Mockito.{mock, times, verify, when}
+import java.util.{Collection, Collections, Optional}
+import scala.collection.Seq
+import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
class TopicCommandTest {
@@ -159,6 +167,74 @@ class TopicCommandTest {
assertEquals(expectedAssignment, actualAssignment)
}
+ @Test
+ def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ val adminClient = mock(classOf[Admin])
+ val topicService = TopicService(adminClient)
+
+ val result = AdminClientTestUtils.createTopicsResult(topicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+ when(adminClient.createTopics(any(), any())).thenReturn(result)
+
+ assertThrows(classOf[ThrottlingQuotaExceededException],
+ () => topicService.createTopic(new TopicCommandOptions(Array("--topic",
topicName))))
+
+ val expectedNewTopic = new NewTopic(topicName, Optional.empty[Integer](),
Optional.empty[java.lang.Short]())
+ .configs(Map.empty[String, String].asJava)
+
+ verify(adminClient, times(1)).createTopics(
+ eqThat(Set(expectedNewTopic).asJava),
+ argThat((_.shouldRetryOnQuotaViolation() == false):
ArgumentMatcher[CreateTopicsOptions])
+ )
+ }
+
+ @Test
+ def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ val adminClient = mock(classOf[Admin])
+ val topicService = TopicService(adminClient)
+
+ val listResult = AdminClientTestUtils.listTopicsResult(topicName)
+ when(adminClient.listTopics(any())).thenReturn(listResult)
+
+ val result = AdminClientTestUtils.deleteTopicsResult(topicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+ when(adminClient.deleteTopics(any[Collection[String]](),
any())).thenReturn(result)
+
+ val exception = assertThrows(classOf[ExecutionException],
+ () => topicService.deleteTopic(new TopicCommandOptions(Array("--topic",
topicName))))
+
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
+
+ verify(adminClient, times(1)).deleteTopics(
+ eqThat(Seq(topicName).asJavaCollection),
+ argThat((_.shouldRetryOnQuotaViolation() == false):
ArgumentMatcher[DeleteTopicsOptions])
+ )
+ }
+
+ @Test
+ def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(): Unit
= {
+ val adminClient = mock(classOf[Admin])
+ val topicService = TopicService(adminClient)
+
+ val listResult = AdminClientTestUtils.listTopicsResult(topicName)
+ when(adminClient.listTopics(any())).thenReturn(listResult)
+
+ val topicPartitionInfo = new TopicPartitionInfo(0, new Node(0, "", 0),
+ Collections.emptyList(), Collections.emptyList())
+ val describeResult = AdminClientTestUtils.describeTopicsResult(topicName,
new TopicDescription(
+ topicName, false, Collections.singletonList(topicPartitionInfo)))
+
when(adminClient.describeTopics(any(classOf[java.util.Collection[String]]))).thenReturn(describeResult)
+
+ val result = AdminClientTestUtils.createPartitionsResult(topicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+ when(adminClient.createPartitions(any(), any())).thenReturn(result)
+
+ val exception = assertThrows(classOf[ExecutionException],
+ () => topicService.alterTopic(new TopicCommandOptions(Array("--topic",
topicName, "--partitions", "3"))))
+
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
+
+ verify(adminClient, times(1)).createPartitions(
+ argThat((_.get(topicName).totalCount() == 3):
ArgumentMatcher[java.util.Map[String, NewPartitions]]),
+ argThat((_.shouldRetryOnQuotaViolation() == false):
ArgumentMatcher[CreatePartitionsOptions])
+ )
+ }
+
private[this] def assertCheckArgsExitCode(expected: Int, options:
TopicCommandOptions): Unit = {
Exit.setExitProcedure {
(exitCode: Int, _: Option[String]) =>