This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9f955973fe5 KAFKA-18399 Remove ZooKeeper from KafkaApis (9/N): ALTER_CLIENT_QUOTAS and ALLOCATE_PRODUCER_IDS (#18465) 9f955973fe5 is described below commit 9f955973fe5fc32e3929fbbf18ce1fccfd1c8258 Author: mingdaoy <mingd...@gmail.com> AuthorDate: Wed Jan 15 05:06:16 2025 +0800 KAFKA-18399 Remove ZooKeeper from KafkaApis (9/N): ALTER_CLIENT_QUOTAS and ALLOCATE_PRODUCER_IDS (#18465) Reviewers: Ismael Juma <ism...@juma.me.uk>, Chia-Ping Tsai <chia7...@gmail.com> --- core/src/main/scala/kafka/server/KafkaApis.scala | 48 -------------------- .../scala/unit/kafka/server/KafkaApisTest.scala | 52 +--------------------- 2 files changed, 1 insertion(+), 99 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 81820d29aee..318ca8f4263 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -222,7 +222,6 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.UNREGISTER_BROKER => forwardToController(request) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) - case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.DESCRIBE_QUORUM => forwardToController(request) case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError) @@ -2507,37 +2506,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - val alterClientQuotasRequest = request.body[AlterClientQuotasRequest] - - if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) { - val result = zkSupport.adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala, - alterClientQuotasRequest.validateOnly) - - val entriesData = result.iterator.map { case (quotaEntity, apiError) => - val entityData = quotaEntity.entries.asScala.iterator.map { case (key, value) => - new AlterClientQuotasResponseData.EntityData() - .setEntityType(key) - .setEntityName(value) - }.toBuffer - - new AlterClientQuotasResponseData.EntryData() - .setErrorCode(apiError.error.code) - .setErrorMessage(apiError.message) - .setEntity(entityData.asJava) - }.toBuffer - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new AlterClientQuotasResponse(new AlterClientQuotasResponseData() - .setThrottleTimeMs(requestThrottleMs) - .setEntries(entriesData.asJava))) - } else { - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - alterClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) - } - } - def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = { val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest] @@ -2695,22 +2663,6 @@ class KafkaApis(val requestChannel: RequestChannel, new ListTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs))) } - def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request)) - authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) - - val allocateProducerIdsRequest = request.body[AllocateProducerIdsRequest] - - if (!zkSupport.controller.isActive) - requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => - allocateProducerIdsRequest.getErrorResponse(throttleTimeMs, Errors.NOT_CONTROLLER.exception)) - else - zkSupport.controller.allocateProducerIds(allocateProducerIdsRequest.data, producerIdsResponse => - requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => - new AllocateProducerIdsResponse(producerIdsResponse.setThrottleTimeMs(throttleTimeMs))) - ) - } - private def groupVersion(): GroupVersion = { GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort)) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 8b91603dbc8..34b1d7cd274 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER} import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException} -import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic} +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult @@ -61,7 +61,6 @@ import org.apache.kafka.common.message._ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil} -import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata @@ -730,48 +729,6 @@ class KafkaApisTest extends Logging { assertEquals(expectedResults, responseMap) } - @Test - def testAlterClientQuotasWithAuthorizer(): Unit = { - val authorizer: Authorizer = mock(classOf[Authorizer]) - - authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER, - Resource.CLUSTER_NAME, AuthorizationResult.DENIED) - - val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user")) - val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection)) - - val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion, clientId, 0) - - val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false) - .build(requestHeader.apiVersion) - val request = buildRequest(alterClientQuotasRequest, - fromPrivilegedListener = true, requestHeader = Option(requestHeader)) - - when(controller.isActive).thenReturn(true) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - anyLong)).thenReturn(0) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) - kafkaApis.handleAlterClientQuotasRequest(request) - - val capturedResponse = verifyNoThrottling[AlterClientQuotasResponse](request) - verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> Errors.CLUSTER_AUTHORIZATION_FAILED)) - - verify(authorizer).authorize(any(), any()) - verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong) - } - - private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse, - expected: Map[ClientQuotaEntity, Errors]): Unit = { - val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap - response.complete(futures.asJava) - futures.foreach { - case (entity, future) => - future.whenComplete((_, thrown) => - assertEquals(thrown, expected(entity).exception()) - ).isDone - } - } - @ParameterizedTest @CsvSource(value = Array("0,1500", "1500,0", "3000,1000")) def testKRaftControllerThrottleTimeEnforced( @@ -10027,13 +9984,6 @@ class KafkaApisTest extends Logging { setResourceType(BROKER_LOGGER.id()))), response.data()) } - - @Test - def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest) - } @Test def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {