dengziming commented on code in PR #12614:
URL: https://github.com/apache/kafka/pull/12614#discussion_r966769259


##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -90,93 +104,142 @@ class RequestQuotaTest extends BaseRequestTest {
     super.setUp(testInfo)
 
     createTopic(topic, numPartitions)
-    leaderNode = servers.head
+    leaderNode = brokers.head
 
     // Change default client-id request quota to a small value and a single 
unthrottledClient with a large quota
     val quotaProps = new Properties()
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
     quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "2000")
     quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "2000")
-    adminZkClient.changeClientIdConfig("<default>", quotaProps)
+    changeClientIdConfig("<default>", quotaProps)
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "2000")
-    
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), 
quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
 
     // Client ids with small producer and consumer (fetch) quotas. Quota 
values were picked so that both
     // producer/consumer and request quotas are violated on the first 
produce/consume operation, and the delay due to
     // producer/consumer quota violation will be longer than the delay due to 
request quota violation.
     quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "1")
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
-    
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId),
 quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), 
quotaProps)
     quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "1")
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
-    
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId),
 quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), 
quotaProps)
 
     TestUtils.retry(20000) {
-      val quotaManager = servers.head.dataPlaneRequestProcessor.quotas.request
+      val quotaManager = brokers.head.dataPlaneRequestProcessor.quotas.request
       assertEquals(Quota.upperBound(0.01), quotaManager.quota("some-user", 
"some-client"), s"Default request quota not set")
       assertEquals(Quota.upperBound(2000), quotaManager.quota("some-user", 
unthrottledClientId), s"Request quota override not set")
-      val produceQuotaManager = 
servers.head.dataPlaneRequestProcessor.quotas.produce
+      val produceQuotaManager = 
brokers.head.dataPlaneRequestProcessor.quotas.produce
       assertEquals(Quota.upperBound(1), produceQuotaManager.quota("some-user", 
smallQuotaProducerClientId), s"Produce quota override not set")
-      val consumeQuotaManager = 
servers.head.dataPlaneRequestProcessor.quotas.fetch
+      val consumeQuotaManager = 
brokers.head.dataPlaneRequestProcessor.quotas.fetch
       assertEquals(Quota.upperBound(1), consumeQuotaManager.quota("some-user", 
smallQuotaConsumerClientId), s"Consume quota override not set")
     }
   }
 
+  private def changeClientIdConfig(sanitizedClientId: String, configs: 
Properties): Unit = {
+    if (isKRaftTest()) {
+      val admin = createAdminClient()
+      admin.alterClientQuotas(Collections.singleton(
+        new ClientQuotaAlteration(
+          new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if 
(sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
+          configs.asScala.map { case (key, value) => new 
ClientQuotaAlteration.Op(key, value.toDouble)}.toList.asJava)
+      )).all().get()
+    } else {
+      adminZkClient.changeClientIdConfig(sanitizedClientId, configs)
+    }
+  }
+
   @AfterEach
   override def tearDown(): Unit = {
     try executor.shutdownNow()
     finally super.tearDown()
   }
 
-  @Test
-  def testResponseThrottleTime(): Unit = {
-    for (apiKey <- RequestQuotaTest.ClientActions ++ 
RequestQuotaTest.ClusterActionsWithThrottle)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResponseThrottleTime(quorum: String): Unit = {
+    val apiKeys = if (isKRaftTest()) {
+      (clientActions ++ clusterActionsWithThrottle).filterNot(_.forwardable)
+    } else {
+      clientActions ++ clusterActionsWithThrottle
+    }
+    for (apiKey <- apiKeys)
       submitTest(apiKey, () => checkRequestThrottleTime(apiKey))
 
     waitAndCheckResults()
   }
 
-  @Test
-  def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(): Unit 
= {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(quorum: 
String): Unit = {
     submitTest(ApiKeys.PRODUCE, () => 
checkSmallQuotaProducerRequestThrottleTime())
     waitAndCheckResults()
   }
 
-  @Test
-  def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(quorum: 
String): Unit = {
     submitTest(ApiKeys.FETCH, () => 
checkSmallQuotaConsumerRequestThrottleTime())
     waitAndCheckResults()
   }
 
-  @Test
-  def testUnthrottledClient(): Unit = {
-    for (apiKey <- RequestQuotaTest.ClientActions) {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUnthrottledClient(quorum: String): Unit = {
+    for (apiKey <- clientActions) {
       submitTest(apiKey, () => checkUnthrottledClient(apiKey))
     }
 
     waitAndCheckResults()
   }
 
-  @Test
-  def testExemptRequestTime(): Unit = {
-    for (apiKey <- RequestQuotaTest.ClusterActions -- 
RequestQuotaTest.ClusterActionsWithThrottle) {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testExemptRequestTime(quorum: String): Unit = {
+    for (apiKey <- clusterActions -- clusterActionsWithThrottle) {
       submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
     }
 
     waitAndCheckResults()
   }
 
-  @Test
-  def testUnauthorizedThrottle(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUnauthorizedThrottle(quorum: String): Unit = {
     RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
 
-    for (apiKey <- ApiKeys.zkBrokerApis.asScala) {
+    val apiKeys = if (isKRaftTest()) 
ApiKeys.kraftBrokerApis().asScala.filterNot(_.forwardable) else 
ApiKeys.zkBrokerApis.asScala

Review Comment:
   ditto



##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -90,93 +104,142 @@ class RequestQuotaTest extends BaseRequestTest {
     super.setUp(testInfo)
 
     createTopic(topic, numPartitions)
-    leaderNode = servers.head
+    leaderNode = brokers.head
 
     // Change default client-id request quota to a small value and a single 
unthrottledClient with a large quota
     val quotaProps = new Properties()
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
     quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "2000")
     quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "2000")
-    adminZkClient.changeClientIdConfig("<default>", quotaProps)
+    changeClientIdConfig("<default>", quotaProps)
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "2000")
-    
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), 
quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
 
     // Client ids with small producer and consumer (fetch) quotas. Quota 
values were picked so that both
     // producer/consumer and request quotas are violated on the first 
produce/consume operation, and the delay due to
     // producer/consumer quota violation will be longer than the delay due to 
request quota violation.
     quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "1")
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
-    
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId),
 quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), 
quotaProps)
     quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "1")
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
-    
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId),
 quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), 
quotaProps)
 
     TestUtils.retry(20000) {
-      val quotaManager = servers.head.dataPlaneRequestProcessor.quotas.request
+      val quotaManager = brokers.head.dataPlaneRequestProcessor.quotas.request
       assertEquals(Quota.upperBound(0.01), quotaManager.quota("some-user", 
"some-client"), s"Default request quota not set")
       assertEquals(Quota.upperBound(2000), quotaManager.quota("some-user", 
unthrottledClientId), s"Request quota override not set")
-      val produceQuotaManager = 
servers.head.dataPlaneRequestProcessor.quotas.produce
+      val produceQuotaManager = 
brokers.head.dataPlaneRequestProcessor.quotas.produce
       assertEquals(Quota.upperBound(1), produceQuotaManager.quota("some-user", 
smallQuotaProducerClientId), s"Produce quota override not set")
-      val consumeQuotaManager = 
servers.head.dataPlaneRequestProcessor.quotas.fetch
+      val consumeQuotaManager = 
brokers.head.dataPlaneRequestProcessor.quotas.fetch
       assertEquals(Quota.upperBound(1), consumeQuotaManager.quota("some-user", 
smallQuotaConsumerClientId), s"Consume quota override not set")
     }
   }
 
+  private def changeClientIdConfig(sanitizedClientId: String, configs: 
Properties): Unit = {
+    if (isKRaftTest()) {
+      val admin = createAdminClient()
+      admin.alterClientQuotas(Collections.singleton(
+        new ClientQuotaAlteration(
+          new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if 
(sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
+          configs.asScala.map { case (key, value) => new 
ClientQuotaAlteration.Op(key, value.toDouble)}.toList.asJava)
+      )).all().get()
+    } else {
+      adminZkClient.changeClientIdConfig(sanitizedClientId, configs)
+    }
+  }
+
   @AfterEach
   override def tearDown(): Unit = {
     try executor.shutdownNow()
     finally super.tearDown()
   }
 
-  @Test
-  def testResponseThrottleTime(): Unit = {
-    for (apiKey <- RequestQuotaTest.ClientActions ++ 
RequestQuotaTest.ClusterActionsWithThrottle)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResponseThrottleTime(quorum: String): Unit = {
+    val apiKeys = if (isKRaftTest()) {
+      (clientActions ++ clusterActionsWithThrottle).filterNot(_.forwardable)

Review Comment:
   Currently, `ClientQuotasDelta` is only published to brokers whereas 
`ControllerApis.QuotaManagers` is not updated after we update client quota 
configs, so all request is not throttled in the controller, so we need to 
filter forwardable ApiKeys here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to