rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298706120
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2999,7 +2999,38 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleCreateTokenRequest(request: RequestChannel.Request): Unit = {
+ val createTokenRequest = request.body[CreateDelegationTokenRequest]
+
+ val requester = request.context.principal
+ val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName
+ val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+ request.context.principal
+ } else {
+ new KafkaPrincipal(createTokenRequest.data.ownerPrincipalType,
ownerPrincipalName)
+ }
+ val renewerList =
createTokenRequest.data.renewers.asScala.toList.map(entry =>
+ new KafkaPrincipal(entry.principalType, entry.principalName))
+
+ if (!allowTokenRequests(request)) {
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion,
requestThrottleMs,
+ Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, owner, requester))
+ } else if (!owner.equals(requester) &&
!authHelper.authorize(request.context, CREATE_TOKENS, USER, owner.toString)) {
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion,
requestThrottleMs,
+ Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
+ } else if (renewerList.exists(principal => principal.getPrincipalType !=
KafkaPrincipal.USER_TYPE)) {
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion,
requestThrottleMs,
+ Errors.INVALID_PRINCIPAL_TYPE, owner, requester))
+ } else {
+ maybeForwardToController(request, handleCreateTokenRequestZk)
+ }
+ }
+
+ def handleCreateTokenRequestZk(request: RequestChannel.Request): Unit = {
Review Comment:
I wonder if this is a broader problem. Are there other RPCs that mutate
Zookeeper that need to check that they are the active controller when migration
is enabled? It is likely a short window of time, but there does seem to be a
possibility that an RPC could make it to a broker and end up mutating Zookeeper
when in fact the active controller is a KRaft controller. @mumrah WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]