This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 901c656a1da KAFKA-15746: KRaft support in ControllerMutationQuotaTest 
(#16620)
901c656a1da is described below

commit 901c656a1da16c8473200c93e57991004fc1371d
Author: Dmitry Werner <[email protected]>
AuthorDate: Tue Jul 30 19:04:16 2024 +0500

    KAFKA-15746: KRaft support in ControllerMutationQuotaTest (#16620)
    
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../kafka/server/ControllerMutationQuotaTest.scala | 89 +++++++++++++++-------
 1 file changed, 61 insertions(+), 28 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index 86a32d126c2..3c06b33640e 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import org.apache.kafka.common.message.DeleteTopicsRequestData
 import org.apache.kafka.common.metrics.KafkaMetric
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.quota.ClientQuotaAlteration
@@ -47,8 +48,11 @@ import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.Assertions.fail
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 object ControllerMutationQuotaTest {
@@ -106,6 +110,12 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     properties.put(QuotaConfigs.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, 
ControllerQuotaWindowSizeSeconds.toString)
   }
 
+  override def kraftControllerConfigs(): Seq[Properties] = {
+    val props = super.kraftControllerConfigs()
+    props.head.setProperty(QuotaConfigs.NUM_CONTROLLER_QUOTA_SAMPLES_CONFIG, 
ControllerQuotaSamples.toString)
+    props
+  }
+
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
@@ -115,8 +125,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     waitUserQuota(ThrottledPrincipal.getName, ControllerMutationRate)
   }
 
-  @Test
-  def testSetUnsetQuota(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSetUnsetQuota(quorum: String): Unit = {
     val rate = 1.5
     val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "User")
     // Default Value
@@ -131,8 +142,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     waitUserQuota(principal.getName, Long.MaxValue)
   }
 
-  @Test
-  def testQuotaMetric(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testQuotaMetric(quorum: String): Unit = {
     asPrincipal(ThrottledPrincipal) {
       // Metric is lazily created
       assertTrue(quotaMetric(principal.getName).isEmpty)
@@ -153,8 +165,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testStrictCreateTopicsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testStrictCreateTopicsRequest(quorum: String): Unit = {
     asPrincipal(ThrottledPrincipal) {
       // Create two topics worth of 30 partitions each. As we use a strict 
quota, we
       // expect one to be created and one to be rejected.
@@ -176,8 +189,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testPermissiveCreateTopicsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testPermissiveCreateTopicsRequest(quorum: String): Unit = {
     asPrincipal(ThrottledPrincipal) {
       // Create two topics worth of 30 partitions each. As we use a permissive 
quota, we
       // expect both topics to be created.
@@ -189,8 +203,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testUnboundedCreateTopicsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUnboundedCreateTopicsRequest(quorum: String): Unit = {
     asPrincipal(UnboundedPrincipal) {
       // Create two topics worth of 30 partitions each. As we use an user 
without quota, we
       // expect both topics to be created. The throttle time should be equal 
to 0.
@@ -200,8 +215,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testStrictDeleteTopicsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testStrictDeleteTopicsRequest(quorum: String): Unit = {
     asPrincipal(UnboundedPrincipal) {
       createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
     }
@@ -227,8 +243,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testPermissiveDeleteTopicsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testPermissiveDeleteTopicsRequest(quorum: String): Unit = {
     asPrincipal(UnboundedPrincipal) {
       createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
     }
@@ -244,8 +261,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testUnboundedDeleteTopicsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUnboundedDeleteTopicsRequest(quorum: String): Unit = {
     asPrincipal(UnboundedPrincipal) {
       createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
 
@@ -257,8 +275,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testStrictCreatePartitionsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testStrictCreatePartitionsRequest(quorum: String): Unit = {
     asPrincipal(UnboundedPrincipal) {
       createTopics(TopicsWithOnePartition, 
StrictCreatePartitionsRequestVersion)
     }
@@ -284,8 +303,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testPermissiveCreatePartitionsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testPermissiveCreatePartitionsRequest(quorum: String): Unit = {
     asPrincipal(UnboundedPrincipal) {
       createTopics(TopicsWithOnePartition, 
StrictCreatePartitionsRequestVersion)
     }
@@ -301,8 +321,9 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testUnboundedCreatePartitionsRequest(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUnboundedCreatePartitionsRequest(quorum: String): Unit = {
     asPrincipal(UnboundedPrincipal) {
       createTopics(TopicsWithOnePartition, 
StrictCreatePartitionsRequestVersion)
 
@@ -364,23 +385,31 @@ class ControllerMutationQuotaTest extends BaseRequestTest 
{
   }
 
   private def waitUserQuota(user: String, expectedQuota: Double): Unit = {
-    val quotaManager = servers.head.quotaManagers.controllerMutation
+    val quotaManager = brokers.head.quotaManagers.controllerMutation
+    val controllerQuotaManager =
+      if (isKRaftTest()) 
Option(controllerServers.head.quotaManagers.controllerMutation)
+      else Option.empty
     var actualQuota = Double.MinValue
 
     TestUtils.waitUntilTrue(() => {
       actualQuota = quotaManager.quota(user, "").bound()
-      expectedQuota == actualQuota
+      if (controllerQuotaManager.isDefined)
+        expectedQuota == actualQuota && expectedQuota == 
controllerQuotaManager.get.quota(user, "").bound()
+      else
+        expectedQuota == actualQuota
     }, s"Quota of $user is not $expectedQuota but $actualQuota")
   }
 
   private def quotaMetric(user: String): Option[KafkaMetric] = {
-    val metrics = servers.head.metrics
+    val metrics =
+      if (isKRaftTest()) controllerServers.head.metrics
+      else brokers.head.metrics
     val metricName = metrics.metricName(
       "tokens",
       QuotaType.ControllerMutation.toString,
       "Tracking remaining tokens in the token bucket per user/client-id",
       Map(DefaultTags.User -> user, DefaultTags.ClientId -> "").asJava)
-    Option(servers.head.metrics.metric(metricName))
+    Option(metrics.metric(metricName))
   }
 
   private def waitQuotaMetric(user: String, expectedQuota: Double): Unit = {
@@ -416,6 +445,10 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
 
   private def sendAlterClientQuotasRequest(entries: 
Iterable[ClientQuotaAlteration]): AlterClientQuotasResponse = {
     val request = new 
AlterClientQuotasRequest.Builder(entries.asJavaCollection, false).build()
-    connectAndReceive[AlterClientQuotasResponse](request, destination = 
controllerSocketServer)
+    connectAndReceive[AlterClientQuotasResponse](
+      request,
+      destination = controllerSocketServer,
+      if (isKRaftTest()) ListenerName.normalised("CONTROLLER") else 
listenerName
+    )
   }
 }

Reply via email to