This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9a2f202a1e9 MINOR: Move ClientQuotasRequestTest to server module (#20053) 9a2f202a1e9 is described below commit 9a2f202a1e99a807b6ba91bb3c4b9148089f0b02 Author: Lan Ding <isdin...@163.com> AuthorDate: Sun Jul 20 23:14:55 2025 +0800 MINOR: Move ClientQuotasRequestTest to server module (#20053) 1. Move ClientQuotasRequestTest to server module. 2. Rewrite ClientQuotasRequestTest in Java. Reviewers: Jhen-Yung Hsu <jhenyung...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/server/ClientQuotasRequestTest.scala | 592 ----------------- .../server/quota/ClientQuotasRequestTest.java | 726 +++++++++++++++++++++ 2 files changed, 726 insertions(+), 592 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala deleted file mode 100644 index dafd11d033c..00000000000 --- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala +++ /dev/null @@ -1,592 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.net.InetAddress -import java.util -import java.util.concurrent.{ExecutionException, TimeUnit} -import org.apache.kafka.common.test.api.ClusterTest -import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism, UserScramCredentialUpsertion} -import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException} -import org.apache.kafka.common.internals.KafkaFutureImpl -import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} -import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse} -import org.apache.kafka.common.test.ClusterInstance -import org.apache.kafka.server.IntegrationTestUtils -import org.apache.kafka.server.config.QuotaConfig -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Disabled - -import scala.jdk.CollectionConverters._ - -class ClientQuotasRequestTest(cluster: ClusterInstance) { - @ClusterTest - def testAlterClientQuotasRequest(): Unit = { - - val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user", ClientQuotaEntity.CLIENT_ID -> "client-id").asJava) - - // Expect an empty configuration. - verifyDescribeEntityQuotas(entity, Map.empty) - - // Add two configuration entries. - alterEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0), - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0) - ), validateOnly = false) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0 - )) - - // Update an existing entry. - alterEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(15000.0) - ), validateOnly = false) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 15000.0, - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0 - )) - - // Remove an existing configuration entry. - alterEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> None - ), validateOnly = false) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0 - )) - - // Remove a non-existent configuration entry. This should make no changes. - alterEntityQuotas(entity, Map( - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> None - ), validateOnly = false) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0 - )) - - // Add back a deleted configuration entry. - alterEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(5000.0) - ), validateOnly = false) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 5000.0, - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0 - )) - - // Perform a mixed update. - alterEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0), - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> None, - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.3) - ), validateOnly = false) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 12.3 - )) - } - - @ClusterTest - def testAlterClientQuotasRequestValidateOnly(): Unit = { - val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava) - - // Set up a configuration. - alterEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0), - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(23.45) - ), validateOnly = false) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45 - )) - - // Validate-only addition. - alterEntityQuotas(entity, Map( - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(50000.0) - ), validateOnly = true) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45 - )) - - // Validate-only modification. - alterEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0) - ), validateOnly = true) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45 - )) - - // Validate-only removal. - alterEntityQuotas(entity, Map( - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> None - ), validateOnly = true) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45 - )) - - // Validate-only mixed update. - alterEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0), - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(50000.0), - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> None - ), validateOnly = true) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, - QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45 - )) - } - - @Disabled("TODO: KAFKA-17630 - Convert ClientQuotasRequestTest#testClientQuotasForScramUsers to kraft") - @ClusterTest - def testClientQuotasForScramUsers(): Unit = { - val userName = "user" - - val admin = cluster.admin() - try { - val results = admin.alterUserScramCredentials(util.Arrays.asList( - new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password"))) - results.all.get - - val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> userName).asJava) - - verifyDescribeEntityQuotas(entity, Map.empty) - - alterEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0), - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0) - ), validateOnly = false) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, - QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0 - )) - } finally { - admin.close() - } - } - - @ClusterTest - def testAlterIpQuotasRequest(): Unit = { - val knownHost = "1.2.3.4" - val unknownHost = "2.3.4.5" - val entity = toIpEntity(Some(knownHost)) - val defaultEntity = toIpEntity(Some(null)) - val entityFilter = ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.IP, knownHost) - val defaultEntityFilter = ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.IP) - val allIpEntityFilter = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP) - - def verifyIpQuotas(entityFilter: ClientQuotaFilterComponent, expectedMatches: Map[ClientQuotaEntity, Double]): Unit = { - TestUtils.tryUntilNoAssertionError() { - val result = describeClientQuotas(ClientQuotaFilter.containsOnly(List(entityFilter).asJava)) - assertEquals(expectedMatches.keySet, result.asScala.keySet) - result.asScala.foreach { case (entity, props) => - assertEquals(Set(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG), props.asScala.keySet) - assertEquals(expectedMatches(entity), props.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG)) - val entityName = entity.entries.get(ClientQuotaEntity.IP) - // ClientQuotaEntity with null name maps to default entity - val entityIp = if (entityName == null) - InetAddress.getByName(unknownHost) - else - InetAddress.getByName(entityName) - var currentServerQuota = 0 - currentServerQuota = cluster.brokers().values().asScala.head.socketServer.connectionQuotas.connectionRateForIp(entityIp) - assertTrue(Math.abs(expectedMatches(entity) - currentServerQuota) < 0.01, - s"Connection quota of $entity is not ${expectedMatches(entity)} but $currentServerQuota") - } - } - } - - // Expect an empty configuration. - verifyIpQuotas(allIpEntityFilter, Map.empty) - - // Add a configuration entry. - alterEntityQuotas(entity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(100.0)), validateOnly = false) - verifyIpQuotas(entityFilter, Map(entity -> 100.0)) - - // update existing entry - alterEntityQuotas(entity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(150.0)), validateOnly = false) - verifyIpQuotas(entityFilter, Map(entity -> 150.0)) - - // update default value - alterEntityQuotas(defaultEntity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(200.0)), validateOnly = false) - verifyIpQuotas(defaultEntityFilter, Map(defaultEntity -> 200.0)) - - // describe all IP quotas - verifyIpQuotas(allIpEntityFilter, Map(entity -> 150.0, defaultEntity -> 200.0)) - - // remove entry - alterEntityQuotas(entity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> None), validateOnly = false) - verifyIpQuotas(entityFilter, Map.empty) - - // remove default value - alterEntityQuotas(defaultEntity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> None), validateOnly = false) - verifyIpQuotas(allIpEntityFilter, Map.empty) - } - - @ClusterTest - def testAlterClientQuotasInvalidRequests(): Unit = { - var entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "").asJava) - assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)), validateOnly = true)) - - entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> "").asJava) - assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)), validateOnly = true)) - - entity = new ClientQuotaEntity(Map("" -> "name").asJava) - assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)), validateOnly = true)) - - entity = new ClientQuotaEntity(Map.empty.asJava) - assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.5)), validateOnly = true)) - - entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava) - assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map("bad" -> Some(1.0)), validateOnly = true)) - - entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava) - assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.5)), validateOnly = true)) - } - - private def expectInvalidRequestWithMessage(runnable: => Unit, expectedMessage: String): Unit = { - val exception = assertThrows(classOf[InvalidRequestException], () => runnable) - assertTrue(exception.getMessage.contains(expectedMessage), s"Expected message $exception to contain $expectedMessage") - } - - @ClusterTest - def testAlterClientQuotasInvalidEntityCombination(): Unit = { - val userAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user", ClientQuotaEntity.IP -> "1.2.3.4").asJava) - val clientAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> "client", ClientQuotaEntity.IP -> "1.2.3.4").asJava) - val expectedExceptionMessage = "Invalid quota entity combination" - expectInvalidRequestWithMessage(alterEntityQuotas(userAndIpEntity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)), - validateOnly = true), expectedExceptionMessage) - expectInvalidRequestWithMessage(alterEntityQuotas(clientAndIpEntity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)), - validateOnly = true), expectedExceptionMessage) - } - - @ClusterTest - def testAlterClientQuotasBadIp(): Unit = { - val invalidHostPatternEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "not a valid host because it has spaces").asJava) - val unresolvableHostEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "RFC2606.invalid").asJava) - val expectedExceptionMessage = "not a valid IP" - expectInvalidRequestWithMessage(alterEntityQuotas(invalidHostPatternEntity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(50.0)), - validateOnly = true), expectedExceptionMessage) - expectInvalidRequestWithMessage(alterEntityQuotas(unresolvableHostEntity, Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(50.0)), - validateOnly = true), expectedExceptionMessage) - } - - @ClusterTest - def testDescribeClientQuotasInvalidFilterCombination(): Unit = { - val ipFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP) - val userFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER) - val clientIdFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID) - val expectedExceptionMessage = "Invalid entity filter component combination" - expectInvalidRequestWithMessage(describeClientQuotas(ClientQuotaFilter.contains(List(ipFilterComponent, userFilterComponent).asJava)), - expectedExceptionMessage) - expectInvalidRequestWithMessage(describeClientQuotas(ClientQuotaFilter.contains(List(ipFilterComponent, clientIdFilterComponent).asJava)), - expectedExceptionMessage) - } - - // Entities to be matched against. - private val matchUserClientEntities = List( - (Some("user-1"), Some("client-id-1"), 50.50), - (Some("user-2"), Some("client-id-1"), 51.51), - (Some("user-3"), Some("client-id-2"), 52.52), - (Some(null), Some("client-id-1"), 53.53), - (Some("user-1"), Some(null), 54.54), - (Some("user-3"), Some(null), 55.55), - (Some("user-1"), None, 56.56), - (Some("user-2"), None, 57.57), - (Some("user-3"), None, 58.58), - (Some(null), None, 59.59), - (None, Some("client-id-2"), 60.60) - ).map { case (u, c, v) => (toClientEntity(u, c), v) } - - private val matchIpEntities = List( - (Some("1.2.3.4"), 10.0), - (Some("2.3.4.5"), 20.0) - ).map { case (ip, quota) => (toIpEntity(ip), quota)} - - private def setupDescribeClientQuotasMatchTest(): Unit = { - val userClientQuotas = matchUserClientEntities.map { case (e, v) => - e -> Map((QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Some(v))) - }.toMap - val ipQuotas = matchIpEntities.map { case (e, v) => - e -> Map((QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Some(v))) - }.toMap - val result = alterClientQuotas(userClientQuotas ++ ipQuotas, validateOnly = false) - (matchUserClientEntities ++ matchIpEntities).foreach(e => result(e._1).get(10, TimeUnit.SECONDS)) - } - - @ClusterTest - def testDescribeClientQuotasMatchExact(): Unit = { - setupDescribeClientQuotasMatchTest() - - def matchEntity(entity: ClientQuotaEntity) = { - val components = entity.entries.asScala.map { case (entityType, entityName) => - entityName match { - case null => ClientQuotaFilterComponent.ofDefaultEntity(entityType) - case name => ClientQuotaFilterComponent.ofEntity(entityType, name) - } - } - describeClientQuotas(ClientQuotaFilter.containsOnly(components.toList.asJava)) - } - - // Test exact matches. - matchUserClientEntities.foreach { case (e, v) => - TestUtils.tryUntilNoAssertionError() { - val result = matchEntity(e) - assertEquals(1, result.size) - assertTrue(result.get(e) != null) - val value = result.get(e).get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG) - assertNotNull(value) - assertEquals(value, v, 1e-6) - } - } - - // Entities not contained in `matchEntityList`. - val notMatchEntities = List( - (Some("user-1"), Some("client-id-2")), - (Some("user-3"), Some("client-id-1")), - (Some("user-2"), Some(null)), - (Some("user-4"), None), - (Some(null), Some("client-id-2")), - (None, Some("client-id-1")), - (None, Some("client-id-3")), - ).map { case (u, c) => - new ClientQuotaEntity((u.map((ClientQuotaEntity.USER, _)) ++ - c.map((ClientQuotaEntity.CLIENT_ID, _))).toMap.asJava) - } - - // Verify exact matches of the non-matches returns empty. - notMatchEntities.foreach { e => - val result = matchEntity(e) - assertEquals(0, result.size) - } - } - - @ClusterTest - def testDescribeClientQuotasMatchPartial(): Unit = { - setupDescribeClientQuotasMatchTest() - - def testMatchEntities(filter: ClientQuotaFilter, expectedMatchSize: Int, partition: ClientQuotaEntity => Boolean): Unit = { - TestUtils.tryUntilNoAssertionError() { - val result = describeClientQuotas(filter) - val (expectedMatches, _) = (matchUserClientEntities ++ matchIpEntities).partition(e => partition(e._1)) - assertEquals(expectedMatchSize, expectedMatches.size) // for test verification - assertEquals(expectedMatchSize, result.size, s"Failed to match $expectedMatchSize entities for $filter") - val expectedMatchesMap = expectedMatches.toMap - matchUserClientEntities.foreach { case (entity, expectedValue) => - if (expectedMatchesMap.contains(entity)) { - val config = result.get(entity) - assertNotNull(config) - val value = config.get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG) - assertNotNull(value) - assertEquals(expectedValue, value, 1e-6) - } else { - assertNull(result.get(entity)) - } - } - matchIpEntities.foreach { case (entity, expectedValue) => - if (expectedMatchesMap.contains(entity)) { - val config = result.get(entity) - assertNotNull(config) - val value = config.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG) - assertNotNull(value) - assertEquals(expectedValue, value, 1e-6) - } else { - assertNull(result.get(entity)) - } - } - } - } - - // Match open-ended existing user. - testMatchEntities( - ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-1")).asJava), 3, - entity => entity.entries.get(ClientQuotaEntity.USER) == "user-1" - ) - - // Match open-ended non-existent user. - testMatchEntities( - ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "unknown")).asJava), 0, - entity => false - ) - - // Match open-ended existing client ID. - testMatchEntities( - ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-2")).asJava), 2, - entity => entity.entries.get(ClientQuotaEntity.CLIENT_ID) == "client-id-2" - ) - - // Match open-ended default user. - testMatchEntities( - ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.USER)).asJava), 2, - entity => entity.entries.containsKey(ClientQuotaEntity.USER) && entity.entries.get(ClientQuotaEntity.USER) == null - ) - - // Match close-ended existing user. - testMatchEntities( - ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-2")).asJava), 1, - entity => entity.entries.get(ClientQuotaEntity.USER) == "user-2" && !entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID) - ) - - // Match close-ended existing client ID that has no matching entity. - testMatchEntities( - ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-1")).asJava), 0, - entity => false - ) - - // Match against all entities with the user type in a close-ended match. - testMatchEntities( - ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)).asJava), 4, - entity => entity.entries.containsKey(ClientQuotaEntity.USER) && !entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID) - ) - - // Match against all entities with the user type in an open-ended match. - testMatchEntities( - ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)).asJava), 10, - entity => entity.entries.containsKey(ClientQuotaEntity.USER) - ) - - // Match against all entities with the client ID type in a close-ended match. - testMatchEntities( - ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)).asJava), 1, - entity => entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID) && !entity.entries.containsKey(ClientQuotaEntity.USER) - ) - - // Match against all entities with the client ID type in an open-ended match. - testMatchEntities( - ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)).asJava), 7, - entity => entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID) - ) - - // Match against all entities with IP type in an open-ended match. - testMatchEntities( - ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)).asJava), 2, - entity => entity.entries.containsKey(ClientQuotaEntity.IP) - ) - - // Match open-ended empty filter list. This should match all entities. - testMatchEntities(ClientQuotaFilter.contains(List.empty.asJava), 13, entity => true) - - // Match close-ended empty filter list. This should match no entities. - testMatchEntities(ClientQuotaFilter.containsOnly(List.empty.asJava), 0, _ => false) - } - - @ClusterTest - def testClientQuotasUnsupportedEntityTypes(): Unit = { - val entity = new ClientQuotaEntity(Map("other" -> "name").asJava) - assertThrows(classOf[UnsupportedVersionException], () => verifyDescribeEntityQuotas(entity, Map.empty)) - } - - @ClusterTest - def testClientQuotasSanitized(): Unit = { - // An entity with name that must be sanitized when writing to Zookeeper. - val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user with spaces").asJava) - - alterEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0), - ), validateOnly = false) - - verifyDescribeEntityQuotas(entity, Map( - QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, - )) - } - - private def verifyDescribeEntityQuotas(entity: ClientQuotaEntity, quotas: Map[String, Double]): Unit = { - TestUtils.tryUntilNoAssertionError(waitTime = 5000L) { - val components = entity.entries.asScala.map { case (entityType, entityName) => - Option(entityName).map{ name => ClientQuotaFilterComponent.ofEntity(entityType, name)} - .getOrElse(ClientQuotaFilterComponent.ofDefaultEntity(entityType) - ) - } - val describe = describeClientQuotas(ClientQuotaFilter.containsOnly(components.toList.asJava)) - if (quotas.isEmpty) { - assertEquals(0, describe.size) - } else { - assertEquals(1, describe.size) - val configs = describe.get(entity) - assertNotNull(configs) - assertEquals(quotas.size, configs.size) - quotas.foreach { case (k, v) => - val value = configs.get(k) - assertNotNull(value) - assertEquals(v, value, 1e-6) - } - } - } - } - - private def toClientEntity(user: Option[String], clientId: Option[String]) = - new ClientQuotaEntity((user.map(ClientQuotaEntity.USER -> _) ++ clientId.map(ClientQuotaEntity.CLIENT_ID -> _)).toMap.asJava) - - private def toIpEntity(ip: Option[String]) = new ClientQuotaEntity(ip.map(ClientQuotaEntity.IP -> _).toMap.asJava) - - private def describeClientQuotas(filter: ClientQuotaFilter) = { - val result = new KafkaFutureImpl[java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]]] - sendDescribeClientQuotasRequest(filter).complete(result) - try result.get catch { - case e: ExecutionException => throw e.getCause - } - } - - private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter): DescribeClientQuotasResponse = { - val request = new DescribeClientQuotasRequest.Builder(filter).build() - IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse]( - request, - cluster.boundPorts().get(0)) - } - - private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String, Option[Double]], validateOnly: Boolean) = - try alterClientQuotas(Map(entity -> alter), validateOnly)(entity).get(10, TimeUnit.SECONDS) catch { - case e: ExecutionException => throw e.getCause - } - - private def alterClientQuotas(request: Map[ClientQuotaEntity, Map[String, Option[Double]]], validateOnly: Boolean) = { - val entries = request.map { case (entity, alter) => - val ops = alter.map { case (key, value) => - new ClientQuotaAlteration.Op(key, value.map(Double.box).orNull) - }.asJavaCollection - new ClientQuotaAlteration(entity, ops) - } - - val response = request.map(e => e._1 -> new KafkaFutureImpl[Void]).asJava - sendAlterClientQuotasRequest(entries, validateOnly).complete(response) - val result = response.asScala - assertEquals(request.size, result.size) - request.foreach(e => assertTrue(result.contains(e._1))) - result - } - - private def sendAlterClientQuotasRequest(entries: Iterable[ClientQuotaAlteration], validateOnly: Boolean): AlterClientQuotasResponse = { - val request = new AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build() - IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse]( - request, - cluster.boundPorts().get(0)) - } -} diff --git a/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java b/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java new file mode 100644 index 00000000000..a4d562a5543 --- /dev/null +++ b/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java @@ -0,0 +1,726 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.quota; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterClientQuotasOptions; +import org.apache.kafka.clients.admin.AlterUserScramCredentialsResult; +import org.apache.kafka.clients.admin.ScramCredentialInfo; +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.clients.admin.UserScramCredentialUpsertion; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; +import org.apache.kafka.common.quota.ClientQuotaFilterComponent; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.server.config.QuotaConfig; +import org.apache.kafka.test.TestUtils; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientQuotasRequestTest { + private final ClusterInstance cluster; + + public ClientQuotasRequestTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @ClusterTest + public void testAlterClientQuotasRequest() throws InterruptedException { + ClientQuotaEntity entity = new ClientQuotaEntity( + Map.of(ClientQuotaEntity.USER, "user", ClientQuotaEntity.CLIENT_ID, "client-id")); + + // Expect an empty configuration. + verifyDescribeEntityQuotas(entity, Map.of()); + + // Add two configuration entries. + alterEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.0), + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(20000.0) + ), false); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 10000.0, + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0 + )); + + // Update an existing entry. + alterEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(15000.0) + ), false); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 15000.0, + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0 + )); + + // Remove an existing configuration entry. + alterEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.empty() + ), false); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0 + )); + + // Remove a non-existent configuration entry. This should make no changes. + alterEntityQuotas(entity, Map.of( + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.empty() + ), false); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0 + )); + + // Add back a deleted configuration entry. + alterEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(5000.0) + ), false); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 5000.0, + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0 + )); + + // Perform a mixed update. + alterEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(20000.0), + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.empty(), + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.3) + ), false); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0, + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 12.3 + )); + } + + @ClusterTest + public void testAlterClientQuotasRequestValidateOnly() throws InterruptedException { + ClientQuotaEntity entity = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user")); + + // Set up a configuration. + alterEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(20000.0), + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(23.45) + ), false); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0, + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45 + )); + + // Validate-only addition. + alterEntityQuotas(entity, Map.of( + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(50000.0) + ), true); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0, + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45 + )); + + // Validate-only modification. + alterEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.0) + ), true); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0, + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45 + )); + + // Validate-only removal. + alterEntityQuotas(entity, Map.of( + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.empty() + ), true); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0, + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45 + )); + + // Validate-only mixed update. + alterEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.0), + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(50000.0), + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.empty() + ), true); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0, + QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45 + )); + } + + @ClusterTest + public void testClientQuotasForScramUsers() throws InterruptedException, ExecutionException { + final String userName = "user"; + + try (Admin admin = cluster.admin()) { + AlterUserScramCredentialsResult results = admin.alterUserScramCredentials(List.of( + new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password"))); + results.all().get(); + + ClientQuotaEntity entity = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, userName)); + + verifyDescribeEntityQuotas(entity, Map.of()); + + alterEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.0), + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(20000.0) + ), false); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 10000.0, + QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0 + )); + } + } + + @ClusterTest + public void testAlterIpQuotasRequest() throws InterruptedException { + final String knownHost = "1.2.3.4"; + final String unknownHost = "2.3.4.5"; + ClientQuotaEntity entity = toIpEntity(Optional.of(knownHost)); + ClientQuotaEntity defaultEntity = toIpEntity(Optional.empty()); + ClientQuotaFilterComponent entityFilter = ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.IP, knownHost); + ClientQuotaFilterComponent defaultEntityFilter = ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.IP); + ClientQuotaFilterComponent allIpEntityFilter = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP); + + // Expect an empty configuration. + verifyIpQuotas(allIpEntityFilter, Map.of(), unknownHost); + + // Add a configuration entry. + alterEntityQuotas(entity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(100.0)), false); + verifyIpQuotas(entityFilter, Map.of(entity, 100.0), unknownHost); + + // update existing entry + alterEntityQuotas(entity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(150.0)), false); + verifyIpQuotas(entityFilter, Map.of(entity, 150.0), unknownHost); + + // update default value + alterEntityQuotas(defaultEntity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(200.0)), false); + verifyIpQuotas(defaultEntityFilter, Map.of(defaultEntity, 200.0), unknownHost); + + // describe all IP quotas + verifyIpQuotas(allIpEntityFilter, Map.of(entity, 150.0, defaultEntity, 200.0), unknownHost); + + // remove entry + alterEntityQuotas(entity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.empty()), false); + verifyIpQuotas(entityFilter, Map.of(), unknownHost); + + // remove default value + alterEntityQuotas(defaultEntity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.empty()), false); + verifyIpQuotas(allIpEntityFilter, Map.of(), unknownHost); + } + + private void verifyIpQuotas(ClientQuotaFilterComponent entityFilter, Map<ClientQuotaEntity, Double> expectedMatches, + String unknownHost) throws InterruptedException { + + TestUtils.retryOnExceptionWithTimeout(5000L, () -> { + Map<ClientQuotaEntity, Map<String, Double>> result = describeClientQuotas( + ClientQuotaFilter.containsOnly(List.of(entityFilter))).get(); + assertEquals(expectedMatches.keySet(), result.keySet()); + + for (Map.Entry<ClientQuotaEntity, Map<String, Double>> entry : result.entrySet()) { + ClientQuotaEntity entity = entry.getKey(); + Map<String, Double> props = entry.getValue(); + assertEquals(Set.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG), props.keySet()); + assertEquals(expectedMatches.get(entity), props.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG)); + String entityName = entity.entries().get(ClientQuotaEntity.IP); + // ClientQuotaEntity with null name maps to default entity + InetAddress entityIp = entityName == null + ? InetAddress.getByName(unknownHost) + : InetAddress.getByName(entityName); + int currentServerQuota = cluster.brokers() + .values() + .iterator() + .next() + .socketServer() + .connectionQuotas() + .connectionRateForIp(entityIp); + assertTrue(Math.abs(expectedMatches.get(entity) - currentServerQuota) < 0.01, + String.format("Connection quota of %s is not %s but %s", entity, expectedMatches.get(entity), currentServerQuota)); + } + }); + } + + @ClusterTest + public void testAlterClientQuotasInvalidRequests() { + final ClientQuotaEntity entity1 = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "")); + TestUtils.assertFutureThrows(InvalidRequestException.class, + alterEntityQuotas(entity1, Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)), true)); + + final ClientQuotaEntity entity2 = new ClientQuotaEntity(Map.of(ClientQuotaEntity.CLIENT_ID, "")); + TestUtils.assertFutureThrows(InvalidRequestException.class, + alterEntityQuotas(entity2, Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)), true)); + + final ClientQuotaEntity entity3 = new ClientQuotaEntity(Map.of("", "name")); + TestUtils.assertFutureThrows(InvalidRequestException.class, + alterEntityQuotas(entity3, Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)), true)); + + final ClientQuotaEntity entity4 = new ClientQuotaEntity(Map.of()); + TestUtils.assertFutureThrows(InvalidRequestException.class, + alterEntityQuotas(entity4, Map.of(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.5)), true)); + + final ClientQuotaEntity entity5 = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user")); + TestUtils.assertFutureThrows(InvalidRequestException.class, + alterEntityQuotas(entity5, Map.of("bad", Optional.of(1.0)), true)); + + final ClientQuotaEntity entity6 = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user")); + TestUtils.assertFutureThrows(InvalidRequestException.class, + alterEntityQuotas(entity6, Map.of(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.5)), true)); + } + + private void expectInvalidRequestWithMessage(Future<?> future, String expectedMessage) { + InvalidRequestException exception = TestUtils.assertFutureThrows(InvalidRequestException.class, future); + assertNotNull(exception); + assertTrue( + exception.getMessage().contains(expectedMessage), + String.format("Expected message %s to contain %s", exception, expectedMessage) + ); + } + + @ClusterTest + public void testAlterClientQuotasInvalidEntityCombination() { + ClientQuotaEntity userAndIpEntity = new ClientQuotaEntity( + Map.of(ClientQuotaEntity.USER, "user", ClientQuotaEntity.IP, "1.2.3.4") + ); + ClientQuotaEntity clientAndIpEntity = new ClientQuotaEntity( + Map.of(ClientQuotaEntity.CLIENT_ID, "client", ClientQuotaEntity.IP, "1.2.3.4") + ); + final String expectedExceptionMessage = "Invalid quota entity combination"; + + expectInvalidRequestWithMessage( + alterEntityQuotas(userAndIpEntity, Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)), true), + expectedExceptionMessage + ); + + expectInvalidRequestWithMessage( + alterEntityQuotas(clientAndIpEntity, Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)), true), + expectedExceptionMessage + ); + } + + @ClusterTest + public void testAlterClientQuotasBadIp() { + ClientQuotaEntity invalidHostPatternEntity = new ClientQuotaEntity( + Map.of(ClientQuotaEntity.IP, "not a valid host because it has spaces") + ); + ClientQuotaEntity unresolvableHostEntity = new ClientQuotaEntity( + Map.of(ClientQuotaEntity.IP, "RFC2606.invalid") + ); + final String expectedExceptionMessage = "not a valid IP"; + + expectInvalidRequestWithMessage( + alterEntityQuotas(invalidHostPatternEntity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(50.0)), true), + expectedExceptionMessage + ); + + expectInvalidRequestWithMessage( + alterEntityQuotas(unresolvableHostEntity, Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(50.0)), true), + expectedExceptionMessage + ); + } + + @ClusterTest + public void testDescribeClientQuotasInvalidFilterCombination() { + ClientQuotaFilterComponent ipFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP); + ClientQuotaFilterComponent userFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER); + ClientQuotaFilterComponent clientIdFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID); + final String expectedExceptionMessage = "Invalid entity filter component combination"; + + expectInvalidRequestWithMessage( + describeClientQuotas(ClientQuotaFilter.contains(List.of(ipFilterComponent, userFilterComponent))), + expectedExceptionMessage + ); + expectInvalidRequestWithMessage( + describeClientQuotas(ClientQuotaFilter.contains(List.of(ipFilterComponent, clientIdFilterComponent))), + expectedExceptionMessage + ); + } + + // Entities to be matched against. + private final Map<ClientQuotaEntity, Double> matchUserClientEntities = new HashMap<>(Map.ofEntries( + Map.entry(toClientEntity(toUserMap("user-1"), toClientIdMap("client-id-1")), 50.50), + Map.entry(toClientEntity(toUserMap("user-2"), toClientIdMap("client-id-1")), 51.51), + Map.entry(toClientEntity(toUserMap("user-3"), toClientIdMap("client-id-2")), 52.52), + Map.entry(toClientEntity(toUserMap(null), toClientIdMap("client-id-1")), 53.53), + Map.entry(toClientEntity(toUserMap("user-1"), toClientIdMap(null)), 54.54), + Map.entry(toClientEntity(toUserMap("user-3"), toClientIdMap(null)), 55.55), + Map.entry(toClientEntity(toUserMap("user-1")), 56.56), + Map.entry(toClientEntity(toUserMap("user-2")), 57.57), + Map.entry(toClientEntity(toUserMap("user-3")), 58.58), + Map.entry(toClientEntity(toUserMap(null)), 59.59), + Map.entry(toClientEntity(toClientIdMap("client-id-2")), 60.60) + )); + + private final Map<ClientQuotaEntity, Double> matchIpEntities = Map.of( + toIpEntity(Optional.of("1.2.3.4")), 10.0, + toIpEntity(Optional.of("2.3.4.5")), 20.0 + ); + + private void setupDescribeClientQuotasMatchTest() { + Map<ClientQuotaEntity, Map<String, Optional<Double>>> userClientQuotas = matchUserClientEntities.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(e.getValue())))); + + Map<ClientQuotaEntity, Map<String, Optional<Double>>> ipQuotas = matchIpEntities.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(e.getValue())))); + + Map<ClientQuotaEntity, Map<String, Optional<Double>>> allQuotas = new HashMap<>(); + allQuotas.putAll(userClientQuotas); + allQuotas.putAll(ipQuotas); + Map<ClientQuotaEntity, KafkaFuture<Void>> result = alterClientQuotas(allQuotas, false); + + matchUserClientEntities.forEach((entity, value) -> { + try { + result.get(entity).get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + }); + matchIpEntities.forEach((entity, value) -> { + try { + result.get(entity).get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + }); + } + + private Map<ClientQuotaEntity, Map<String, Double>> matchEntity(ClientQuotaEntity entity) + throws ExecutionException, InterruptedException { + List<ClientQuotaFilterComponent> components = entity.entries().entrySet().stream().map(entry -> { + if (entry.getValue() == null) { + return ClientQuotaFilterComponent.ofDefaultEntity(entry.getKey()); + } else { + return ClientQuotaFilterComponent.ofEntity(entry.getKey(), entry.getValue()); + } + }).toList(); + + return describeClientQuotas(ClientQuotaFilter.containsOnly(components)).get(); + } + + @ClusterTest + public void testDescribeClientQuotasMatchExact() throws ExecutionException, InterruptedException { + setupDescribeClientQuotasMatchTest(); + + // Test exact matches. + matchUserClientEntities.forEach((e, v) -> { + try { + TestUtils.retryOnExceptionWithTimeout(5000L, () -> { + Map<ClientQuotaEntity, Map<String, Double>> result = matchEntity(e); + assertEquals(1, result.size()); + assertNotNull(result.get(e)); + double value = result.get(e).get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG); + assertEquals(value, v, 1e-6); + }); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + }); + + // Entities not contained in `matchEntityList`. + List<ClientQuotaEntity> notMatchEntities = List.of( + toClientEntity(toUserMap("user-1"), toClientIdMap("client-id-2")), + toClientEntity(toUserMap("user-3"), toClientIdMap("client-id-1")), + toClientEntity(toUserMap("user-2"), toClientIdMap(null)), + toClientEntity(toUserMap("user-4")), + toClientEntity(toUserMap(null), toClientIdMap("client-id-2")), + toClientEntity(toClientIdMap("client-id-1")), + toClientEntity(toClientIdMap("client-id-3")) + ); + + // Verify exact matches of the non-matches returns empty. + for (ClientQuotaEntity e : notMatchEntities) { + Map<ClientQuotaEntity, Map<String, Double>> result = matchEntity(e); + assertEquals(0, result.size()); + } + } + + private void testMatchEntities(ClientQuotaFilter filter, int expectedMatchSize, Predicate<ClientQuotaEntity> partition) + throws InterruptedException { + TestUtils.retryOnExceptionWithTimeout(5000L, () -> { + Map<ClientQuotaEntity, Map<String, Double>> result = describeClientQuotas(filter).get(); + List<Map.Entry<ClientQuotaEntity, Double>> expectedMatches = matchUserClientEntities.entrySet() + .stream() + .collect(Collectors.partitioningBy(entry -> partition.test(entry.getKey()))) + .get(true); + expectedMatches.addAll(matchIpEntities.entrySet() + .stream() + .collect(Collectors.partitioningBy(entry -> partition.test(entry.getKey()))) + .get(true)); + + // for test verification + assertEquals(expectedMatchSize, expectedMatches.size()); + assertEquals(expectedMatchSize, result.size(), + "Failed to match " + expectedMatchSize + "entities for " + filter); + Map<Object, Object> expectedMatchesMap = Map.ofEntries(expectedMatches.toArray(new Map.Entry[0])); + matchUserClientEntities.forEach((entity, expectedValue) -> { + if (expectedMatchesMap.containsKey(entity)) { + Map<String, Double> config = result.get(entity); + assertNotNull(config); + Double value = config.get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG); + assertNotNull(value); + assertEquals(expectedValue, value, 1e-6); + } else { + assertNull(result.get(entity)); + } + }); + matchIpEntities.forEach((entity, expectedValue) -> { + if (expectedMatchesMap.containsKey(entity)) { + Map<String, Double> config = result.get(entity); + assertNotNull(config); + Double value = config.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG); + assertNotNull(value); + assertEquals(expectedValue, value, 1e-6); + } else { + assertNull(result.get(entity)); + } + }); + }); + } + + @ClusterTest + public void testDescribeClientQuotasMatchPartial() throws InterruptedException { + setupDescribeClientQuotasMatchTest(); + + // Match open-ended existing user. + testMatchEntities( + ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-1"))), + 3, + entity -> Objects.equals(entity.entries().get(ClientQuotaEntity.USER), "user-1") + ); + + // Match open-ended non-existent user. + testMatchEntities( + ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "unknown"))), + 0, + entity -> false + ); + + // Match open-ended existing client ID. + testMatchEntities( + ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-2"))), + 2, + entity -> Objects.equals(entity.entries().get(ClientQuotaEntity.CLIENT_ID), "client-id-2") + ); + + // Match open-ended default user. + testMatchEntities( + ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.USER))), + 2, + entity -> entity.entries().containsKey(ClientQuotaEntity.USER) && entity.entries().get(ClientQuotaEntity.USER) == null + ); + + // Match close-ended existing user. + testMatchEntities( + ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-2"))), + 1, + entity -> Objects.equals(entity.entries().get(ClientQuotaEntity.USER), "user-2") && !entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID) + ); + + // Match close-ended existing client ID that has no matching entity. + testMatchEntities( + ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-1"))), + 0, + entity -> false + ); + + // Match against all entities with the user type in a close-ended match. + testMatchEntities( + ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER))), + 4, + entity -> entity.entries().containsKey(ClientQuotaEntity.USER) && !entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID) + ); + + // Match against all entities with the user type in an open-ended match. + testMatchEntities( + ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER))), + 10, + entity -> entity.entries().containsKey(ClientQuotaEntity.USER) + ); + + // Match against all entities with the client ID type in a close-ended match. + testMatchEntities( + ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID))), + 1, + entity -> entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID) && !entity.entries().containsKey(ClientQuotaEntity.USER) + ); + + // Match against all entities with the client ID type in an open-ended match. + testMatchEntities( + ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID))), + 7, + entity -> entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID) + ); + + // Match against all entities with IP type in an open-ended match. + testMatchEntities( + ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP))), + 2, + entity -> entity.entries().containsKey(ClientQuotaEntity.IP) + ); + + // Match open-ended empty filter List. This should match all entities. + testMatchEntities(ClientQuotaFilter.contains(List.of()), 13, entity -> true); + + // Match close-ended empty filter List. This should match no entities. + testMatchEntities(ClientQuotaFilter.containsOnly(List.of()), 0, entity -> false); + } + + @ClusterTest + public void testClientQuotasUnsupportedEntityTypes() { + ClientQuotaEntity entity = new ClientQuotaEntity(Map.of("other", "name")); + KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> future = describeClientQuotas( + ClientQuotaFilter.containsOnly(getComponents(entity))); + + TestUtils.assertFutureThrows(UnsupportedVersionException.class, future); + } + + @ClusterTest + public void testClientQuotasSanitized() throws InterruptedException { + // An entity with name that must be sanitized when writing to Zookeeper. + ClientQuotaEntity entity = new ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user with spaces")); + + alterEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(20000.0) + ), false); + + verifyDescribeEntityQuotas(entity, Map.of( + QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0 + )); + } + + private Map<String, String> toUserMap(String user) { + // Uses Collections.singletonMap instead of Map.of to support null user parameter. + return Collections.singletonMap(ClientQuotaEntity.USER, user); + } + + private Map<String, String> toClientIdMap(String clientId) { + // Uses Collections.singletonMap instead of Map.of to support null client-id parameter. + return Collections.singletonMap(ClientQuotaEntity.CLIENT_ID, clientId); + } + + @SafeVarargs + private ClientQuotaEntity toClientEntity(Map<String, String>... entries) { + Map<String, String> entityMap = new HashMap<>(); + for (Map<String, String> entry : entries) { + entityMap.putAll(entry); + } + return new ClientQuotaEntity(entityMap); + } + + private ClientQuotaEntity toIpEntity(Optional<String> ip) { + return new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.IP, ip.orElse(null))); + } + + private void verifyDescribeEntityQuotas(ClientQuotaEntity entity, Map<String, Double> quotas) + throws InterruptedException { + TestUtils.retryOnExceptionWithTimeout(5000L, () -> { + + Map<ClientQuotaEntity, Map<String, Double>> describe = describeClientQuotas( + ClientQuotaFilter.containsOnly(getComponents(entity))).get(); + if (quotas.isEmpty()) { + assertEquals(0, describe.size()); + } else { + assertEquals(1, describe.size()); + Map<String, Double> configs = describe.get(entity); + assertNotNull(configs); + assertEquals(quotas.size(), configs.size()); + + quotas.forEach((k, v) -> { + Double value = configs.get(k); + assertNotNull(value); + assertEquals(v, value, 1e-6); + }); + } + }); + } + + private List<ClientQuotaFilterComponent> getComponents(ClientQuotaEntity entity) { + return entity.entries().entrySet().stream().map(entry -> { + String entityType = entry.getKey(); + String entityName = entry.getValue(); + return Optional.ofNullable(entityName) + .map(name -> ClientQuotaFilterComponent.ofEntity(entityType, name)) + .orElseGet(() -> ClientQuotaFilterComponent.ofDefaultEntity(entityType)); + }).toList(); + } + + private KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> describeClientQuotas(ClientQuotaFilter filter) { + try (Admin admin = cluster.admin()) { + return admin.describeClientQuotas(filter).entities(); + } + } + + private KafkaFuture<Void> alterEntityQuotas(ClientQuotaEntity entity, Map<String, Optional<Double>> alter, boolean validateOnly) { + + return alterClientQuotas(Map.of(entity, alter), validateOnly).get(entity); + } + + private Map<ClientQuotaEntity, KafkaFuture<Void>> alterClientQuotas(Map<ClientQuotaEntity, Map<String, + Optional<Double>>> request, boolean validateOnly) { + + List<ClientQuotaAlteration> entries = request.entrySet().stream().map(entry -> { + ClientQuotaEntity entity = entry.getKey(); + Map<String, Optional<Double>> alter = entry.getValue(); + + List<ClientQuotaAlteration.Op> ops = alter.entrySet() + .stream() + .map(configEntry -> new ClientQuotaAlteration.Op(configEntry.getKey(), + configEntry.getValue().orElse(null))) + .collect(Collectors.toList()); + return new ClientQuotaAlteration(entity, ops); + }).collect(Collectors.toList()); + + try (Admin admin = cluster.admin()) { + Map<ClientQuotaEntity, KafkaFuture<Void>> result = admin.alterClientQuotas(entries, + new AlterClientQuotasOptions().validateOnly(validateOnly)).values(); + assertEquals(request.size(), result.size()); + request.forEach((e, r) -> assertTrue(result.containsKey(e))); + return result; + } + } +}