This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f955973fe5 KAFKA-18399 Remove ZooKeeper from KafkaApis (9/N): 
ALTER_CLIENT_QUOTAS and ALLOCATE_PRODUCER_IDS (#18465)
9f955973fe5 is described below

commit 9f955973fe5fc32e3929fbbf18ce1fccfd1c8258
Author: mingdaoy <mingd...@gmail.com>
AuthorDate: Wed Jan 15 05:06:16 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (9/N): ALTER_CLIENT_QUOTAS and 
ALLOCATE_PRODUCER_IDS (#18465)
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 48 --------------------
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 52 +---------------------
 2 files changed, 1 insertion(+), 99 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 81820d29aee..318ca8f4263 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -222,7 +222,6 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.UNREGISTER_BROKER => forwardToController(request)
         case ApiKeys.DESCRIBE_TRANSACTIONS => 
handleDescribeTransactionsRequest(request)
         case ApiKeys.LIST_TRANSACTIONS => 
handleListTransactionsRequest(request)
-        case ApiKeys.ALLOCATE_PRODUCER_IDS => 
handleAllocateProducerIdsRequest(request)
         case ApiKeys.DESCRIBE_QUORUM => forwardToController(request)
         case ApiKeys.CONSUMER_GROUP_HEARTBEAT => 
handleConsumerGroupHeartbeat(request).exceptionally(handleError)
         case ApiKeys.CONSUMER_GROUP_DESCRIBE => 
handleConsumerGroupDescribe(request).exceptionally(handleError)
@@ -2507,37 +2506,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-    val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]
-
-    if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
-      val result = 
zkSupport.adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala,
-        alterClientQuotasRequest.validateOnly)
-
-      val entriesData = result.iterator.map { case (quotaEntity, apiError) =>
-        val entityData = quotaEntity.entries.asScala.iterator.map { case (key, 
value) =>
-          new AlterClientQuotasResponseData.EntityData()
-            .setEntityType(key)
-            .setEntityName(value)
-        }.toBuffer
-
-        new AlterClientQuotasResponseData.EntryData()
-          .setErrorCode(apiError.error.code)
-          .setErrorMessage(apiError.message)
-          .setEntity(entityData.asJava)
-      }.toBuffer
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new AlterClientQuotasResponse(new AlterClientQuotasResponseData()
-          .setThrottleTimeMs(requestThrottleMs)
-          .setEntries(entriesData.asJava)))
-    } else {
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        alterClientQuotasRequest.getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
-    }
-  }
-
   def handleDescribeUserScramCredentialsRequest(request: 
RequestChannel.Request): Unit = {
     val describeUserScramCredentialsRequest = 
request.body[DescribeUserScramCredentialsRequest]
 
@@ -2695,22 +2663,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       new 
ListTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
 
-  def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit 
= {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
-    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-
-    val allocateProducerIdsRequest = request.body[AllocateProducerIdsRequest]
-
-    if (!zkSupport.controller.isActive)
-      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
-        allocateProducerIdsRequest.getErrorResponse(throttleTimeMs, 
Errors.NOT_CONTROLLER.exception))
-    else
-      
zkSupport.controller.allocateProducerIds(allocateProducerIdsRequest.data, 
producerIdsResponse =>
-        requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
-          new 
AllocateProducerIdsResponse(producerIdsResponse.setThrottleTimeMs(throttleTimeMs)))
-      )
-  }
-
   private def groupVersion(): GroupVersion = {
     
GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME,
 0.toShort))
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 8b91603dbc8..34b1d7cd274 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, 
BROKER_LOGGER}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
UnsupportedVersionException}
-import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.memory.MemoryPool
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic,
 AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, 
AddPartitionsToTxnTransactionCollection}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
@@ -61,7 +61,6 @@ import org.apache.kafka.common.message._
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
-import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
 import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
@@ -730,48 +729,6 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedResults, responseMap)
   }
 
-  @Test
-  def testAlterClientQuotasWithAuthorizer(): Unit = {
-    val authorizer: Authorizer = mock(classOf[Authorizer])
-
-    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
-      Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
-
-    val quotaEntity = new 
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
-    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, 
Seq.empty.asJavaCollection))
-
-    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, 
ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion, clientId, 0)
-
-    val alterClientQuotasRequest = new 
AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
-      .build(requestHeader.apiVersion)
-    val request = buildRequest(alterClientQuotasRequest,
-      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
-
-    when(controller.isActive).thenReturn(true)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      anyLong)).thenReturn(0)
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
-    kafkaApis.handleAlterClientQuotasRequest(request)
-
-    val capturedResponse = 
verifyNoThrottling[AlterClientQuotasResponse](request)
-    verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> 
Errors.CLUSTER_AUTHORIZATION_FAILED))
-
-    verify(authorizer).authorize(any(), any())
-    verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), 
anyLong)
-  }
-
-  private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse,
-                                           expected: Map[ClientQuotaEntity, 
Errors]): Unit = {
-    val futures = expected.keys.map(quotaEntity => quotaEntity -> new 
KafkaFutureImpl[Void]()).toMap
-    response.complete(futures.asJava)
-    futures.foreach {
-      case (entity, future) =>
-        future.whenComplete((_, thrown) =>
-          assertEquals(thrown, expected(entity).exception())
-        ).isDone
-    }
-  }
-
   @ParameterizedTest
   @CsvSource(value = Array("0,1500", "1500,0", "3000,1000"))
   def testKRaftControllerThrottleTimeEnforced(
@@ -10027,13 +9984,6 @@ class KafkaApisTest extends Logging {
         setResourceType(BROKER_LOGGER.id()))),
       response.data())
   }
-  
-  @Test
-  def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
-  }
 
   @Test
   def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {

Reply via email to