pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1282514451


##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {

Review Comment:
   > It seems to me that the overall way of creating/expiring etc. a token is 
very similar both in Zk and KRaft as we aim to keep the functionality mostly 
intact. Do you think it should be refactored to a common code that has 
`createToken`, `expireToken` etc. method that can be used both in this class 
and `DelegationTokenManagerZk`?
   
   The answer to why it is the way I coded it is long and I spent a lot of time 
trying different things until I finally settled on the current implementation.
   
   First, the original DelegationTokenManager was and is the interface for 
updating the TokenCache for DelegationTokens. Authentication using 
DelegationTokens is handled elsewhere using the TokenCache and has not changed 
at all. I felt that reimplementing the TokenCache and then updating the 
authentication mechanisms was too much work, so I had to keep some interface to 
the TokenCache for both Zk and KRaft.
   
   The original ZK implementation is to call the DelegationTokenManager for 
updates which queries the TokenCache for state, updates the token state and 
pushes the new token to Zk and updating the local TokenCache. The other brokers 
will pull the new state of the token and update their local TokenCache in the 
DelegationTokenManager. This workflow has not changed in the new code, but the 
Zk specific code has moved to DelegationTokenManagerZk which inherits the 
common code from DelegationTokenManager and overrides the methods it needs to 
achieve 100% backwards compatibility.
   
   The KRaft way is to forward the request to a controller which queries a 
local TokenCache for state, updates the token and pushes the token to the 
metadata log. Then on replay of the log, which is pushed to each of the 
controllers and brokers, theTokenCache is updated. The controller does not 
update the local TokenCache until the replay of the metadata log. 
   
   So I could potentially add more logic to the original DelegationTokenManager 
to handle these two distinct cases but the code gets even more ugly doing that, 
or I could just pull the Zk code out and have it interact with the TokenCache 
through the DelegationTokenManager and implement the KRaft version which also 
interacts with the TokenCache through the DelegationTokenManager.
   
   The other reason to move the Zk code to its own file is that in AK 4.0 which 
will be coming out in a year, there will be no more support for Zk. It will be 
easy to just delete DelegationTokenManagerZk when this happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to