This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 901c656a1da KAFKA-15746: KRaft support in ControllerMutationQuotaTest
(#16620)
901c656a1da is described below
commit 901c656a1da16c8473200c93e57991004fc1371d
Author: Dmitry Werner <[email protected]>
AuthorDate: Tue Jul 30 19:04:16 2024 +0500
KAFKA-15746: KRaft support in ControllerMutationQuotaTest (#16620)
Reviewers: Mickael Maison <[email protected]>
---
.../kafka/server/ControllerMutationQuotaTest.scala | 89 +++++++++++++++-------
1 file changed, 61 insertions(+), 28 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index 86a32d126c2..3c06b33640e 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.DeleteTopicsRequestData
import org.apache.kafka.common.metrics.KafkaMetric
+import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.quota.ClientQuotaAlteration
@@ -47,8 +48,11 @@ import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Assertions.fail
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+import scala.collection.Seq
import scala.jdk.CollectionConverters._
object ControllerMutationQuotaTest {
@@ -106,6 +110,12 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
properties.put(QuotaConfigs.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_CONFIG,
ControllerQuotaWindowSizeSeconds.toString)
}
+ override def kraftControllerConfigs(): Seq[Properties] = {
+ val props = super.kraftControllerConfigs()
+ props.head.setProperty(QuotaConfigs.NUM_CONTROLLER_QUOTA_SAMPLES_CONFIG,
ControllerQuotaSamples.toString)
+ props
+ }
+
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
@@ -115,8 +125,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
waitUserQuota(ThrottledPrincipal.getName, ControllerMutationRate)
}
- @Test
- def testSetUnsetQuota(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSetUnsetQuota(quorum: String): Unit = {
val rate = 1.5
val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "User")
// Default Value
@@ -131,8 +142,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
waitUserQuota(principal.getName, Long.MaxValue)
}
- @Test
- def testQuotaMetric(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testQuotaMetric(quorum: String): Unit = {
asPrincipal(ThrottledPrincipal) {
// Metric is lazily created
assertTrue(quotaMetric(principal.getName).isEmpty)
@@ -153,8 +165,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
- @Test
- def testStrictCreateTopicsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testStrictCreateTopicsRequest(quorum: String): Unit = {
asPrincipal(ThrottledPrincipal) {
// Create two topics worth of 30 partitions each. As we use a strict
quota, we
// expect one to be created and one to be rejected.
@@ -176,8 +189,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
- @Test
- def testPermissiveCreateTopicsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testPermissiveCreateTopicsRequest(quorum: String): Unit = {
asPrincipal(ThrottledPrincipal) {
// Create two topics worth of 30 partitions each. As we use a permissive
quota, we
// expect both topics to be created.
@@ -189,8 +203,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
- @Test
- def testUnboundedCreateTopicsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testUnboundedCreateTopicsRequest(quorum: String): Unit = {
asPrincipal(UnboundedPrincipal) {
// Create two topics worth of 30 partitions each. As we use an user
without quota, we
// expect both topics to be created. The throttle time should be equal
to 0.
@@ -200,8 +215,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
- @Test
- def testStrictDeleteTopicsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testStrictDeleteTopicsRequest(quorum: String): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
}
@@ -227,8 +243,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
- @Test
- def testPermissiveDeleteTopicsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testPermissiveDeleteTopicsRequest(quorum: String): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
}
@@ -244,8 +261,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
- @Test
- def testUnboundedDeleteTopicsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testUnboundedDeleteTopicsRequest(quorum: String): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
@@ -257,8 +275,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
- @Test
- def testStrictCreatePartitionsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testStrictCreatePartitionsRequest(quorum: String): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWithOnePartition,
StrictCreatePartitionsRequestVersion)
}
@@ -284,8 +303,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
- @Test
- def testPermissiveCreatePartitionsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testPermissiveCreatePartitionsRequest(quorum: String): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWithOnePartition,
StrictCreatePartitionsRequestVersion)
}
@@ -301,8 +321,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
- @Test
- def testUnboundedCreatePartitionsRequest(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testUnboundedCreatePartitionsRequest(quorum: String): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWithOnePartition,
StrictCreatePartitionsRequestVersion)
@@ -364,23 +385,31 @@ class ControllerMutationQuotaTest extends BaseRequestTest
{
}
private def waitUserQuota(user: String, expectedQuota: Double): Unit = {
- val quotaManager = servers.head.quotaManagers.controllerMutation
+ val quotaManager = brokers.head.quotaManagers.controllerMutation
+ val controllerQuotaManager =
+ if (isKRaftTest())
Option(controllerServers.head.quotaManagers.controllerMutation)
+ else Option.empty
var actualQuota = Double.MinValue
TestUtils.waitUntilTrue(() => {
actualQuota = quotaManager.quota(user, "").bound()
- expectedQuota == actualQuota
+ if (controllerQuotaManager.isDefined)
+ expectedQuota == actualQuota && expectedQuota ==
controllerQuotaManager.get.quota(user, "").bound()
+ else
+ expectedQuota == actualQuota
}, s"Quota of $user is not $expectedQuota but $actualQuota")
}
private def quotaMetric(user: String): Option[KafkaMetric] = {
- val metrics = servers.head.metrics
+ val metrics =
+ if (isKRaftTest()) controllerServers.head.metrics
+ else brokers.head.metrics
val metricName = metrics.metricName(
"tokens",
QuotaType.ControllerMutation.toString,
"Tracking remaining tokens in the token bucket per user/client-id",
Map(DefaultTags.User -> user, DefaultTags.ClientId -> "").asJava)
- Option(servers.head.metrics.metric(metricName))
+ Option(metrics.metric(metricName))
}
private def waitQuotaMetric(user: String, expectedQuota: Double): Unit = {
@@ -416,6 +445,10 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
private def sendAlterClientQuotasRequest(entries:
Iterable[ClientQuotaAlteration]): AlterClientQuotasResponse = {
val request = new
AlterClientQuotasRequest.Builder(entries.asJavaCollection, false).build()
- connectAndReceive[AlterClientQuotasResponse](request, destination =
controllerSocketServer)
+ connectAndReceive[AlterClientQuotasResponse](
+ request,
+ destination = controllerSocketServer,
+ if (isKRaftTest()) ListenerName.normalised("CONTROLLER") else
listenerName
+ )
}
}