omkreddy commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293243166


##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val alterRequest = request.body[CreateDelegationTokenRequest]

Review Comment:
    alterRequest => createTokenRequest



##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val alterRequest = request.body[CreateDelegationTokenRequest]
+
+    val requester = request.context.principal
+    val ownerPrincipalName = alterRequest.data.ownerPrincipalName
+    val ownerPrincipalType = alterRequest.data.ownerPrincipalType
+    val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+      request.context.principal
+    } else {
+      new KafkaPrincipal(ownerPrincipalType, ownerPrincipalName)
+    }
+
+    // Requester is always allowed to create token for self
+    if (!owner.equals(requester) && 

Review Comment:
   why is this check if its already verified in KafkaAPIs.scala?



##########
core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala:
##########
@@ -109,13 +127,44 @@ class DelegationTokenRequestsTest extends BaseRequestTest 
with SaslSetup {
     val expireResult2 = adminClient.expireDelegationToken(token2.hmac())
     expiryTimestamp = expireResult2.expiryTimestamp().get()
 
+    val expireResult3 = adminClient.expireDelegationToken(token3.hmac())
+    expiryTimestamp = expireResult3.expiryTimestamp().get()
+
+    TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 0),
+          "Timed out waiting for token to propagate to all servers")
+
     tokens = adminClient.describeDelegationToken().delegationTokens().get()
     assertTrue(tokens.size == 0)
 
     //create token with invalid principal type
-    val renewer3 = 
List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava
-    val createResult3 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer3))
-    assertThrows(classOf[ExecutionException], () => 
createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException]
+    val renewer4 = 
List(SecurityUtils.parseKafkaPrincipal("Group:Renewer4")).asJava
+    val createResult4 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer4))
+    val createResult4Error = assertThrows(classOf[ExecutionException], () => 
createResult4.delegationToken().get())
+    
assertTrue(createResult4Error.getCause.isInstanceOf[InvalidPrincipalTypeException])
+
+    // Try to renew a deleted token
+    val renewResultPostDelete = adminClient.renewDelegationToken(token1.hmac())
+    val renewResultPostDeleteError = assertThrows(classOf[ExecutionException], 
() => renewResultPostDelete.expiryTimestamp().get())
+    
assertTrue(renewResultPostDeleteError.getCause.isInstanceOf[DelegationTokenNotFoundException])
+
+    // Create a DelegationToken with a short lifetime to validate the expire 
code
+    val createResult5 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions()
+      .renewers(renewer1)
+      .maxlifeTimeMs(60 * 1000))
+    val token5 = createResult5.delegationToken().get()
+
+    TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 1),
+          "Timed out waiting for token to propagate to all servers")
+    
+    Thread.sleep(2 * 60 *1000)

Review Comment:
   is this Thread.sleep required?



##########
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##########
@@ -186,57 +133,28 @@ class DelegationTokenManager(val config: KafkaConfig,
   val tokenMaxLifetime: Long = config.delegationTokenMaxLifeMs
   val defaultTokenRenewTime: Long = config.delegationTokenExpiryTimeMs
   val tokenRemoverScanInterval: Long = 
config.delegationTokenExpiryCheckIntervalMs

Review Comment:
   can we remove this usused `tokenRemoverScanInterval` 



##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val alterRequest = request.body[CreateDelegationTokenRequest]
+
+    val requester = request.context.principal
+    val ownerPrincipalName = alterRequest.data.ownerPrincipalName
+    val ownerPrincipalType = alterRequest.data.ownerPrincipalType
+    val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+      request.context.principal
+    } else {
+      new KafkaPrincipal(ownerPrincipalType, ownerPrincipalName)
+    }
+
+    // Requester is always allowed to create token for self
+    if (!owner.equals(requester) && 
+      !authHelper.authorize(request.context, CREATE_TOKENS, USER, 
owner.toString)) {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+        
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+          Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
+    }
+
+    val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,
+      OptionalLong.empty())
+
+    // Copy the response data to a new response so we can apply the request 
version
+    controller.createDelegationToken(context, alterRequest.data)
+      .thenApply[Unit] { response =>
+         requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+           CreateDelegationTokenResponse.prepareResponse(
+             request.context.requestVersion,
+             requestThrottleMs,
+             Errors.forCode(response.errorCode()),
+             new KafkaPrincipal(response.principalType(), 
response.principalName()),
+             new KafkaPrincipal(response.tokenRequesterPrincipalType(), 
response.tokenRequesterPrincipalName()),
+             response.issueTimestampMs(),
+             response.expiryTimestampMs(),
+             response.maxTimestampMs(),
+             response.tokenId(),
+             ByteBuffer.wrap(response.hmac())))
+      }
+  }
+
+  def handleRenewDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+     val alterRequest = request.body[RenewDelegationTokenRequest]

Review Comment:
    alterRequest => renewTokenRequest



##########
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##########
@@ -24,30 +24,20 @@ import java.util.Base64
 
 import javax.crypto.spec.SecretKeySpec
 import javax.crypto.{Mac, SecretKey}
-import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
-import kafka.utils.{CoreUtils, Json, Logging}
-import kafka.zk.{DelegationTokenChangeNotificationSequenceZNode, 
DelegationTokenChangeNotificationZNode, DelegationTokensZNode, KafkaZkClient}
+import kafka.utils.Logging
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internals.{ScramFormatter, 
ScramMechanism}
 import org.apache.kafka.common.security.scram.ScramCredential
 import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
-import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils, Time}
+import org.apache.kafka.common.utils.Time
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 
 object DelegationTokenManager {
   val DefaultHmacAlgorithm = "HmacSHA512"
-  val OwnerKey ="owner"
-  val TokenRequesterKey = "tokenRequester"
-  val RenewersKey = "renewers"
-  val IssueTimestampKey = "issueTimestamp"
-  val MaxTimestampKey = "maxTimestamp"
-  val ExpiryTimestampKey = "expiryTimestamp"
-  val TokenIdKey = "tokenId"
-  val VersionKey = "version"
   val CurrentVersion = 3
   val ErrorTimestamp = -1
 

Review Comment:
   `createBase64HMAC` method is unused



##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val alterRequest = request.body[CreateDelegationTokenRequest]
+
+    val requester = request.context.principal
+    val ownerPrincipalName = alterRequest.data.ownerPrincipalName
+    val ownerPrincipalType = alterRequest.data.ownerPrincipalType
+    val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+      request.context.principal
+    } else {
+      new KafkaPrincipal(ownerPrincipalType, ownerPrincipalName)
+    }
+
+    // Requester is always allowed to create token for self
+    if (!owner.equals(requester) && 
+      !authHelper.authorize(request.context, CREATE_TOKENS, USER, 
owner.toString)) {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+        
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+          Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
+    }
+
+    val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,
+      OptionalLong.empty())
+
+    // Copy the response data to a new response so we can apply the request 
version
+    controller.createDelegationToken(context, alterRequest.data)
+      .thenApply[Unit] { response =>
+         requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+           CreateDelegationTokenResponse.prepareResponse(
+             request.context.requestVersion,
+             requestThrottleMs,
+             Errors.forCode(response.errorCode()),
+             new KafkaPrincipal(response.principalType(), 
response.principalName()),
+             new KafkaPrincipal(response.tokenRequesterPrincipalType(), 
response.tokenRequesterPrincipalName()),
+             response.issueTimestampMs(),
+             response.expiryTimestampMs(),
+             response.maxTimestampMs(),
+             response.tokenId(),
+             ByteBuffer.wrap(response.hmac())))
+      }
+  }
+
+  def handleRenewDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+     val alterRequest = request.body[RenewDelegationTokenRequest]
+
+     val context = new ControllerRequestContext(
+       request.context.header.data,
+       request.context.principal,
+       OptionalLong.empty())
+     controller.renewDelegationToken(context, alterRequest.data)
+       .thenApply[Unit] { response =>
+         requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+           new 
RenewDelegationTokenResponse(response.setThrottleTimeMs(requestThrottleMs)))
+      }
+  }
+
+  def handleExpireDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+     val alterRequest = request.body[ExpireDelegationTokenRequest]

Review Comment:
   alterRequest => expireTokenRequest



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+        private long tokenRemoverScanInterval = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryCheckIntervalMs(long 
tokenRemoverScanInterval) {
+            this.tokenRemoverScanInterval = tokenRemoverScanInterval;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+            return new DelegationTokenControlManager(
+              logContext,
+              snapshotRegistry,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime,
+              tokenRemoverScanInterval);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+    private final long tokenRemoverScanInterval;

Review Comment:
   unused `tokenRemoverScanInterval`?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+        private long tokenRemoverScanInterval = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryCheckIntervalMs(long 
tokenRemoverScanInterval) {
+            this.tokenRemoverScanInterval = tokenRemoverScanInterval;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+            return new DelegationTokenControlManager(
+              logContext,
+              snapshotRegistry,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime,
+              tokenRemoverScanInterval);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+    private final long tokenRemoverScanInterval;
+    long tokenRemoverScanLastTime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,

Review Comment:
   unused `snapshotRegistry`?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+        private long tokenRemoverScanInterval = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryCheckIntervalMs(long 
tokenRemoverScanInterval) {
+            this.tokenRemoverScanInterval = tokenRemoverScanInterval;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+            return new DelegationTokenControlManager(
+              logContext,
+              snapshotRegistry,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime,
+              tokenRemoverScanInterval);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+    private final long tokenRemoverScanInterval;
+    long tokenRemoverScanLastTime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime,
+        long tokenRemoverScanInterval
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+        this.tokenRemoverScanInterval = tokenRemoverScanInterval;
+        this.tokenRemoverScanLastTime = time.milliseconds();
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isEnabled() {
+        if (secretKeyString != null) {
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * Pass in the MetadataVersion so that we can return a response to the 
caller 
+     * if the current metadataVersion is too low.
+     */
+    public ControllerResult<CreateDelegationTokenResponseData> 
createDelegationToken(
+        ControllerRequestContext context,
+        CreateDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+        long maxLifeTime = tokenDefaultMaxLifetime;
+        if (requestData.maxLifetimeMs() > 0) {
+            maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
+        }
+
+        long maxTimestamp = now + maxLifeTime;
+        long expiryTimestamp = Math.min(maxTimestamp, now + 
tokenDefaultRenewLifetime);
+
+        String tokenId = Uuid.randomUuid().toString();
+
+        KafkaPrincipal owner = context.principal();
+        if ((requestData.ownerPrincipalName() != null) && 
+            (!requestData.ownerPrincipalName().isEmpty())) {
+
+            owner = new KafkaPrincipal(requestData.ownerPrincipalType(), 
requestData.ownerPrincipalName());
+        }
+        CreateDelegationTokenResponseData responseData = new 
CreateDelegationTokenResponseData()
+                .setPrincipalName(owner.getName())
+                .setPrincipalType(owner.getPrincipalType())
+                .setTokenRequesterPrincipalName(context.principal().getName())
+                
.setTokenRequesterPrincipalType(context.principal().getPrincipalType());
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }
+
+        if (!metadataVersion.isDelegationTokenSupported()) {
+            // DelegationTokens are not supported in this metadata version
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(UNSUPPORTED_VERSION.code()));
+        }
+
+        List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>();
+        for (CreatableRenewers renewer : requestData.renewers()) {
+            if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) {
+                renewers.add(new KafkaPrincipal(renewer.principalType(), 
renewer.principalName()));
+            } else {
+                return ControllerResult.atomicOf(records, 
responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code()));
+            }
+        }
+
+        byte[] hmac;
+        try {
+            hmac = createHmac(tokenId);
+        } catch (Throwable e) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(ApiError.fromThrowable(e).error().code()));
+        }
+
+        TokenInformation newTokenInformation = new TokenInformation(tokenId, 
owner,
+            context.principal(), renewers, now, maxTimestamp, expiryTimestamp);
+
+        DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(newTokenInformation);
+
+        responseData
+                .setErrorCode(NONE.code())
+                .setIssueTimestampMs(now)
+                .setExpiryTimestampMs(expiryTimestamp)
+                .setMaxTimestampMs(maxTimestamp)
+                .setTokenId(tokenId)
+                .setHmac(hmac);
+
+        records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    public ControllerResult<RenewDelegationTokenResponseData> 
renewDelegationToken(
+        ControllerRequestContext context,
+        RenewDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        RenewDelegationTokenResponseData responseData = new 
RenewDelegationTokenResponseData();
+
+        TokenInformation myTokenInformation = getToken(requestData.hmac());
+
+        if (myTokenInformation == null) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
+        }
+
+        if (myTokenInformation.maxTimestamp() < now || 
myTokenInformation.expiryTimestamp() < now) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code()));
+        }
+
+        if (!allowedToRenew(myTokenInformation, context.principal())) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code()));
+        }
+
+        long renewLifeTime = tokenDefaultRenewLifetime;
+        if (requestData.renewPeriodMs() > 0) {
+            renewLifeTime = Math.min(renewLifeTime, 
requestData.renewPeriodMs());
+        }
+        long renewTimeStamp = now + renewLifeTime;
+        long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), 
renewTimeStamp);
+
+        myTokenInformation.setExpiryTimestamp(expiryTimestamp);
+
+        DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(myTokenInformation);
+
+        responseData
+            .setErrorCode(NONE.code())
+            .setExpiryTimestampMs(expiryTimestamp);
+
+        records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    public ControllerResult<ExpireDelegationTokenResponseData> 
expireDelegationToken(
+        ControllerRequestContext context,
+        ExpireDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        ExpireDelegationTokenResponseData responseData = new 
ExpireDelegationTokenResponseData();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }
+
+        TokenInformation myTokenInformation = getToken(requestData.hmac());
+
+        if (myTokenInformation == null) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
+        }
+
+        if (myTokenInformation.maxTimestamp() < now || 
myTokenInformation.expiryTimestamp() < now) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code()));
+        }
+
+        if (!allowedToRenew(myTokenInformation, context.principal())) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code()));
+        }
+
+        if (requestData.expiryTimePeriodMs() < 0) { // expire immediately
+            responseData
+                .setErrorCode(NONE.code())
+                .setExpiryTimestampMs(requestData.expiryTimePeriodMs());
+            records.add(new ApiMessageAndVersion(new 
RemoveDelegationTokenRecord().
+                setTokenId(myTokenInformation.tokenId()), (short) 0));
+        } else {
+            long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(),
+                now + requestData.expiryTimePeriodMs());
+
+            responseData
+                .setErrorCode(NONE.code())
+                .setExpiryTimestampMs(expiryTimestamp);
+
+            myTokenInformation.setExpiryTimestamp(expiryTimestamp);
+
+            DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(myTokenInformation);
+            records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        }
+
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    // Periodic call to remove expired DelegationTokens
+    public List<ApiMessageAndVersion> sweepExpiredDelegationTokens() {

Review Comment:
   Do we have any test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to