This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new b42273625e2 KAFKA-18399 Remove ZooKeeper from KafkaApis (6/N): 
`handleCreateTokenRequest`,  `handleRenewTokenRequestZk`,  
`handleExpireTokenRequestZk` (#18447)
b42273625e2 is described below

commit b42273625e2cd2f1340955a9b731e44112fb301c
Author: Ken Huang <s7133...@gmail.com>
AuthorDate: Mon Jan 13 01:42:32 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (6/N): 
`handleCreateTokenRequest`,  `handleRenewTokenRequestZk`,  
`handleExpireTokenRequestZk` (#18447)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 121 ++-------------------
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  29 +----
 2 files changed, 8 insertions(+), 142 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c084b637965..9d257c39d59 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -77,7 +77,6 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, 
FetchParams, FetchPa
 import org.apache.kafka.storage.internals.log.AppendOrigin
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
-import java.nio.ByteBuffer
 import java.time.Duration
 import java.util
 import java.util.concurrent.atomic.AtomicInteger
@@ -2570,92 +2569,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleCreateTokenRequestZk(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-
-    val createTokenRequest = request.body[CreateDelegationTokenRequest]
-
-    // the callback for sending a create token response
-    def sendResponseCallback(createResult: CreateTokenResult): Unit = {
-      trace(s"Sending create token response for correlation id 
${request.header.correlationId} " +
-        s"to client ${request.header.clientId}.")
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs, createResult.error, createResult.owner,
-          createResult.tokenRequester, createResult.issueTimestamp, 
createResult.expiryTimestamp, createResult.maxTimestamp, createResult.tokenId,
-          ByteBuffer.wrap(createResult.hmac)))
-    }
-
-    val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName
-    val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
-      request.context.principal
-    } else {
-      new KafkaPrincipal(createTokenRequest.data.ownerPrincipalType, 
ownerPrincipalName)
-    }
-    val requester = request.context.principal
-    val renewerList = 
createTokenRequest.data.renewers.asScala.toList.map(entry =>
-      new KafkaPrincipal(entry.principalType, entry.principalName))
-
-    // DelegationToken changes only need to be executed on the controller 
during migration
-    if (!zkSupport.controller.isActive) {
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
-          Errors.NOT_CONTROLLER, owner, requester))
-    } else {
-      tokenManager.createToken(
-        owner,
-        requester,
-        renewerList,
-        createTokenRequest.data.maxLifetimeMs,
-        sendResponseCallback)
-    }
-  }
-
-  def handleRenewTokenRequest(request: RequestChannel.Request): Unit = {
-    if (!allowTokenRequests(request)) {
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new RenewDelegationTokenResponse(
-          new RenewDelegationTokenResponseData()
-              .setThrottleTimeMs(requestThrottleMs)
-              .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
-              .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
-    } else {
-      forwardToController(request)
-    }
-  }
-
-  def handleRenewTokenRequestZk(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-
-    val renewTokenRequest = request.body[RenewDelegationTokenRequest]
-
-    // the callback for sending a renew token response
-    def sendResponseCallback(error: Errors, expiryTimestamp: Long): Unit = {
-      trace("Sending renew token response for correlation id %d to client %s."
-        .format(request.header.correlationId, request.header.clientId))
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new RenewDelegationTokenResponse(
-             new RenewDelegationTokenResponseData()
-               .setThrottleTimeMs(requestThrottleMs)
-               .setErrorCode(error.code)
-               .setExpiryTimestampMs(expiryTimestamp)))
-    }
-    // DelegationToken changes only need to be executed on the controller 
during migration
-    if (!zkSupport.controller.isActive) {
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new RenewDelegationTokenResponse(
-          new RenewDelegationTokenResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(Errors.NOT_CONTROLLER.code)))
-    } else {
-      tokenManager.renewToken(
-        request.context.principal,
-        ByteBuffer.wrap(renewTokenRequest.data.hmac),
-        renewTokenRequest.data.renewPeriodMs,
-        sendResponseCallback
-      )
-    }
-  }
-
   def handleExpireTokenRequest(request: RequestChannel.Request): Unit = {
     if (!allowTokenRequests(request)) {
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
@@ -2669,36 +2582,16 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleExpireTokenRequestZk(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
-
-    val expireTokenRequest = request.body[ExpireDelegationTokenRequest]
-
-    // the callback for sending a expire token response
-    def sendResponseCallback(error: Errors, expiryTimestamp: Long): Unit = {
-      trace("Sending expire token response for correlation id %d to client %s."
-        .format(request.header.correlationId, request.header.clientId))
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new ExpireDelegationTokenResponse(
-            new ExpireDelegationTokenResponseData()
-              .setThrottleTimeMs(requestThrottleMs)
-              .setErrorCode(error.code)
-              .setExpiryTimestampMs(expiryTimestamp)))
-    }
-    // DelegationToken changes only need to be executed on the controller 
during migration
-    if (!zkSupport.controller.isActive) {
+  def handleRenewTokenRequest(request: RequestChannel.Request): Unit = {
+    if (!allowTokenRequests(request)) {
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new ExpireDelegationTokenResponse(
-          new ExpireDelegationTokenResponseData()
+        new RenewDelegationTokenResponse(
+          new RenewDelegationTokenResponseData()
             .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(Errors.NOT_CONTROLLER.code)))
+            .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
+            .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
     } else {
-      tokenManager.expireToken(
-        request.context.principal,
-        expireTokenRequest.hmac(),
-        expireTokenRequest.expiryTimePeriod(),
-        sendResponseCallback
-      )
+      forwardToController(request)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d8532c6cd8b..eb91e92d9fd 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -10325,34 +10325,7 @@ class KafkaApisTest extends Logging {
         setResourceType(BROKER_LOGGER.id()))),
       response.data())
   }
-
-  @Test
-  // Test that in KRaft mode, a request that isn't forwarded gets the correct 
error message.
-  // We skip the pre-forward checks in handleCreateTokenRequest
-  def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTokenRequestZk)
-  }
-
-  @Test
-  // Test that in KRaft mode, a request that isn't forwarded gets the correct 
error message.
-  // We skip the pre-forward checks in handleRenewTokenRequest
-  def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleRenewTokenRequestZk)
-  }
-
-  @Test
-  // Test that in KRaft mode, a request that isn't forwarded gets the correct 
error message.
-  // We skip the pre-forward checks in handleExpireTokenRequest
-  def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleExpireTokenRequestZk)
-  }
-
+  
   @Test
   def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)

Reply via email to