[ 
https://issues.apache.org/jira/browse/KAFKA-6447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434307#comment-16434307
 ] 

ASF GitHub Bot commented on KAFKA-6447:
---------------------------------------

junrao closed pull request #4427: KAFKA-6447: Add Delegation Token Operations 
to KafkaAdminClient (KIP-249)
URL: https://github.com/apache/kafka/pull/4427
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index f83698050ed..69f560ec38f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -858,6 +858,7 @@ project(':clients') {
     include "**/org/apache/kafka/common/config/*"
     include "**/org/apache/kafka/common/security/auth/*"
     include "**/org/apache/kafka/server/policy/*"
+    include "**/org/apache/kafka/common/security/token/delegation/*"
   }
 }
 
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0fec810a95b..2767132886d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -10,7 +10,7 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
+              
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|AdminClient).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
     <suppress checks="ClassFanOutComplexity"
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 897e127d557..53b77ce0d21 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -535,4 +535,158 @@ public DeleteRecordsResult 
deleteRecords(Map<TopicPartition, RecordsToDelete> re
      */
     public abstract DeleteRecordsResult deleteRecords(Map<TopicPartition, 
RecordsToDelete> recordsToDelete,
                                                       DeleteRecordsOptions 
options);
+
+    /**
+     * <p>Create a Delegation Token.</p>
+     *
+     * <p>This is a convenience method for {@link 
#createDelegationToken(CreateDelegationTokenOptions)} with default options.
+     * See the overload for more details.</p>
+     *
+     * @return                      The CreateDelegationTokenResult.
+     */
+    public CreateDelegationTokenResult createDelegationToken() {
+        return createDelegationToken(new CreateDelegationTokenOptions());
+    }
+
+
+    /**
+     * <p>Create a Delegation Token.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or 
higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on the futures obtained from the
+     * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} 
method of the returned {@code CreateDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link 
org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation 
token authenticated channels.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.InvalidPrincipalTypeException}
+     *     if the renewers principal type is not supported.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link 
CreateDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param options               The options to use when creating 
delegation token.
+     * @return                      The DeleteRecordsResult.
+     */
+    public abstract CreateDelegationTokenResult 
createDelegationToken(CreateDelegationTokenOptions options);
+
+
+    /**
+     * <p>Renew a Delegation Token.</p>
+     *
+     * <p>This is a convenience method for {@link 
#renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default 
options.
+     * See the overload for more details.</p>
+     *
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @return                      The RenewDelegationTokenResult.
+     */
+    public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
+        return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
+    }
+
+    /**
+     * <p> Renew a Delegation Token.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or 
higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on the futures obtained from the
+     * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} 
method of the returned {@code RenewDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link 
org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation 
token authenticated channels.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+     *     if the delegation token is not found on server.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+     *     if the authenticated user is not owner/renewer of the token.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.DelegationTokenExpiredException}
+     *     if the delegation token is expired.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link 
RenewDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @param options               The options to use when renewing 
delegation token.
+     * @return                      The RenewDelegationTokenResult.
+     */
+    public abstract RenewDelegationTokenResult renewDelegationToken(byte[] 
hmac, RenewDelegationTokenOptions options);
+
+    /**
+     * <p>Expire a Delegation Token.</p>
+     *
+     * <p>This is a convenience method for {@link 
#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default 
options.
+     * This will expire the token immediately. See the overload for more 
details.</p>
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @return                      The ExpireDelegationTokenResult.
+     */
+    public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
+        return expireDelegationToken(hmac, new ExpireDelegationTokenOptions());
+    }
+
+    /**
+     * <p>Expire a Delegation Token.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or 
higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on the futures obtained from the
+     * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} 
method of the returned {@code ExpireDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link 
org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation 
token authenticated channels.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+     *     if the delegation token is not found on server.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+     *     if the authenticated user is not owner/renewer of the requested 
token.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.DelegationTokenExpiredException}
+     *     if the delegation token is expired.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link 
ExpireDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @param options               The options to use when expiring 
delegation token.
+     * @return                      The ExpireDelegationTokenResult.
+     */
+    public abstract ExpireDelegationTokenResult expireDelegationToken(byte[] 
hmac, ExpireDelegationTokenOptions options);
+
+    /**
+     *<p>Describe the Delegation Tokens.</p>
+     *
+     * <p>This is a convenience method for {@link 
#describeDelegationToken(DescribeDelegationTokenOptions)} with default options.
+     * This will return all the user owned tokens and tokens where user have 
Describe permission. See the overload for more details.</p>
+     *
+     * @return                      The DescribeDelegationTokenResult.
+     */
+    public DescribeDelegationTokenResult describeDelegationToken() {
+        return describeDelegationToken(new DescribeDelegationTokenOptions());
+    }
+
+    /**
+     * <p>Describe the Delegation Tokens.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or 
higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on the futures obtained from the
+     * {@link DescribeDelegationTokenResult#delegationTokens() 
delegationTokens()} method of the returned {@code 
DescribeDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link 
org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation 
token authenticated channels.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link 
DescribeDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param options               The options to use when describing 
delegation tokens.
+     * @return                      The DescribeDelegationTokenResult.
+     */
+    public abstract DescribeDelegationTokenResult 
describeDelegationToken(DescribeDelegationTokenOptions options);
+
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
new file mode 100644
index 00000000000..1b77b943800
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+/**
+ * Options for {@link 
AdminClient#createDelegationToken(CreateDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class CreateDelegationTokenOptions extends 
AbstractOptions<CreateDelegationTokenOptions> {
+    private long maxLifeTimeMs = -1;
+    private List<KafkaPrincipal> renewers =  new LinkedList<>();
+
+    public CreateDelegationTokenOptions renewers(List<KafkaPrincipal> 
renewers) {
+        this.renewers = renewers;
+        return this;
+    }
+
+    public List<KafkaPrincipal> renewers() {
+        return renewers;
+    }
+
+    public CreateDelegationTokenOptions maxlifeTimeMs(long maxLifeTimeMs) {
+        this.maxLifeTimeMs = maxLifeTimeMs;
+        return this;
+    }
+
+    public long maxlifeTimeMs() {
+        return maxLifeTimeMs;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
new file mode 100644
index 00000000000..043cbe87fef
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+
+/**
+ * The result of the {@link 
KafkaAdminClient#createDelegationToken(CreateDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class CreateDelegationTokenResult {
+    private final KafkaFuture<DelegationToken> delegationToken;
+
+    CreateDelegationTokenResult(KafkaFuture<DelegationToken> delegationToken) {
+        this.delegationToken = delegationToken;
+    }
+
+    /**
+     * Returns a future which yields a delegation token
+     */
+    public KafkaFuture<DelegationToken> delegationToken() {
+        return delegationToken;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
new file mode 100644
index 00000000000..60b99354e35
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+/**
+ * Options for {@link 
AdminClient#describeDelegationToken(DescribeDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class DescribeDelegationTokenOptions extends 
AbstractOptions<DescribeDelegationTokenOptions> {
+    private List<KafkaPrincipal> owners;
+
+    /**
+     * if owners is null, all the user owned tokens and tokens where user have 
Describe permission
+     * will be returned.
+     * @param owners
+     * @return this instance
+     */
+    public DescribeDelegationTokenOptions owners(List<KafkaPrincipal> owners) {
+        this.owners = owners;
+        return this;
+    }
+
+    public List<KafkaPrincipal> owners() {
+        return owners;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
new file mode 100644
index 00000000000..7a9d4b9dd97
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+
+/**
+ * The result of the {@link 
KafkaAdminClient#describeDelegationToken(DescribeDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class DescribeDelegationTokenResult {
+    private final KafkaFuture<List<DelegationToken>> delegationTokens;
+
+    DescribeDelegationTokenResult(KafkaFuture<List<DelegationToken>> 
delegationTokens) {
+        this.delegationTokens = delegationTokens;
+    }
+
+    /**
+     * Returns a future which yields list of delegation tokens
+     */
+    public KafkaFuture<List<DelegationToken>> delegationTokens() {
+        return delegationTokens;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
new file mode 100644
index 00000000000..138cd4e69e4
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#expireDelegationToken(byte[], 
ExpireDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class ExpireDelegationTokenOptions extends 
AbstractOptions<ExpireDelegationTokenOptions> {
+    private long expiryTimePeriodMs = -1L;
+
+    public ExpireDelegationTokenOptions expiryTimePeriodMs(long 
expiryTimePeriodMs) {
+        this.expiryTimePeriodMs = expiryTimePeriodMs;
+        return this;
+    }
+
+    public long expiryTimePeriodMs() {
+        return expiryTimePeriodMs;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
new file mode 100644
index 00000000000..41782bdcb5c
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], 
ExpireDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class ExpireDelegationTokenResult {
+    private final KafkaFuture<Long> expiryTimestamp;
+
+    ExpireDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) {
+        this.expiryTimestamp = expiryTimestamp;
+    }
+
+    /**
+     * Returns a future which yields expiry timestamp
+     */
+    public KafkaFuture<Long> expiryTimestamp() {
+        return expiryTimestamp;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 511895354ab..3ac0e285622 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -69,6 +69,8 @@
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
+import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
@@ -85,12 +87,20 @@
 import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsRequest;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
+import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsRequest;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
+import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
+import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
 import org.apache.kafka.common.requests.Resource;
 import org.apache.kafka.common.requests.ResourceType;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
@@ -2072,4 +2082,131 @@ void handleFailure(Throwable throwable) {
 
         return new DeleteRecordsResult(new HashMap<TopicPartition, 
KafkaFuture<DeletedRecords>>(futures));
     }
+
+    @Override
+    public CreateDelegationTokenResult createDelegationToken(final 
CreateDelegationTokenOptions options) {
+        final KafkaFutureImpl<DelegationToken> delegationTokenFuture = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new 
CreateDelegationTokenRequest.Builder(options.renewers(), 
options.maxlifeTimeMs());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                CreateDelegationTokenResponse response = 
(CreateDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    
delegationTokenFuture.completeExceptionally(response.error().exception());
+                } else {
+                    TokenInformation tokenInfo =  new 
TokenInformation(response.tokenId(), response.owner(),
+                        options.renewers(), response.issueTimestamp(), 
response.maxTimestamp(), response.expiryTimestamp());
+                    DelegationToken token = new DelegationToken(tokenInfo, 
response.hmacBytes());
+                    delegationTokenFuture.complete(token);
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                delegationTokenFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new CreateDelegationTokenResult(delegationTokenFuture);
+    }
+
+    @Override
+    public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, 
final RenewDelegationTokenOptions options) {
+        final KafkaFutureImpl<Long>  expiryTimeFuture = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new RenewDelegationTokenRequest.Builder(hmac, 
options.renewTimePeriodMs());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                RenewDelegationTokenResponse response = 
(RenewDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    
expiryTimeFuture.completeExceptionally(response.error().exception());
+                } else {
+                    expiryTimeFuture.complete(response.expiryTimestamp());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                expiryTimeFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new RenewDelegationTokenResult(expiryTimeFuture);
+    }
+
+    @Override
+    public ExpireDelegationTokenResult expireDelegationToken(final byte[] 
hmac, final ExpireDelegationTokenOptions options) {
+        final KafkaFutureImpl<Long>  expiryTimeFuture = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new ExpireDelegationTokenRequest.Builder(hmac, 
options.expiryTimePeriodMs());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                ExpireDelegationTokenResponse response = 
(ExpireDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    
expiryTimeFuture.completeExceptionally(response.error().exception());
+                } else {
+                    expiryTimeFuture.complete(response.expiryTimestamp());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                expiryTimeFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new ExpireDelegationTokenResult(expiryTimeFuture);
+    }
+
+    @Override
+    public DescribeDelegationTokenResult describeDelegationToken(final 
DescribeDelegationTokenOptions options) {
+        final KafkaFutureImpl<List<DelegationToken>>  tokensFuture = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new 
DescribeDelegationTokenRequest.Builder(options.owners());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeDelegationTokenResponse response = 
(DescribeDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    
tokensFuture.completeExceptionally(response.error().exception());
+                } else {
+                    tokensFuture.complete(response.tokens());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                tokensFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new DescribeDelegationTokenResult(tokensFuture);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
new file mode 100644
index 00000000000..238dc4a3494
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#renewDelegationToken(byte[], 
RenewDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class RenewDelegationTokenOptions extends 
AbstractOptions<RenewDelegationTokenOptions> {
+    private long renewTimePeriodMs = -1;
+
+    public RenewDelegationTokenOptions renewTimePeriodMs(long 
renewTimePeriodMs) {
+        this.renewTimePeriodMs = renewTimePeriodMs;
+        return this;
+    }
+
+    public long renewTimePeriodMs() {
+        return renewTimePeriodMs;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
new file mode 100644
index 00000000000..38cdf1ae1b2
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], 
ExpireDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class RenewDelegationTokenResult {
+    private final KafkaFuture<Long> expiryTimestamp;
+
+    RenewDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) {
+        this.expiryTimestamp = expiryTimestamp;
+    }
+
+    /**
+     * Returns a future which yields expiry timestamp
+     */
+    public KafkaFuture<Long> expiryTimestamp() {
+        return expiryTimestamp;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java 
b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 80ccb7e1382..078d844e85f 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -26,7 +26,7 @@
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.Collections;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 5502164563b..3985c7e97fe 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -41,7 +41,7 @@
 import org.apache.kafka.common.security.scram.internal.ScramMechanism;
 import 
org.apache.kafka.common.security.scram.internal.ScramServerCallbackHandler;
 import org.apache.kafka.common.security.ssl.SslFactory;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 import org.apache.kafka.common.utils.Java;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index dba29eafe99..7ba270a6153 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -184,4 +184,8 @@ public Errors error() {
     public List<DelegationToken> tokens() {
         return tokens;
     }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
index 0d43440d329..40f0aadc0bb 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -88,9 +88,9 @@ public long expiryTimePeriod() {
         private final ByteBuffer hmac;
         private final long expiryTimePeriod;
 
-        public Builder(ByteBuffer hmac, long expiryTimePeriod) {
+        public Builder(byte[] hmac, long expiryTimePeriod) {
             super(ApiKeys.EXPIRE_DELEGATION_TOKEN);
-            this.hmac = hmac;
+            this.hmac = ByteBuffer.wrap(hmac);
             this.expiryTimePeriod = expiryTimePeriod;
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
index f7e0ec44168..1a673bc7020 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
@@ -93,4 +93,8 @@ protected Struct toStruct(short version) {
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
index 4a4b762a72a..a65c705fb66 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -88,9 +88,9 @@ public long renewTimePeriod() {
         private final ByteBuffer hmac;
         private final long renewTimePeriod;
 
-        public Builder(ByteBuffer hmac, long renewTimePeriod) {
+        public Builder(byte[] hmac, long renewTimePeriod) {
             super(ApiKeys.RENEW_DELEGATION_TOKEN);
-            this.hmac = hmac;
+            this.hmac = ByteBuffer.wrap(hmac);
             this.renewTimePeriod = renewTimePeriod;
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
index 1885b480a96..3233f5c1d7b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
@@ -93,4 +93,8 @@ public Errors error() {
     public long expiryTimestamp() {
         return expiryTimestamp;
     }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
index deee0b8fb33..60857279f59 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
@@ -40,7 +40,7 @@
 import 
org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFirstMessage;
 import 
org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFinalMessage;
 import 
org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFirstMessage;
-import 
org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
index 377aa3d3df5..9a3f0dc66b7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
@@ -29,8 +29,8 @@
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.ScramCredentialCallback;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
-import 
org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback;
 
 public class ScramServerCallbackHandler implements AuthenticateCallbackHandler 
{
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
 
b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
index 05ccbda2fe6..e1f97c1b72d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
@@ -16,11 +16,16 @@
  */
 package org.apache.kafka.common.security.token.delegation;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.utils.Base64;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+/**
+ * A class representing a delegation token.
+ *
+ */
[email protected]
 public class DelegationToken {
     private TokenInformation tokenInformation;
     private byte[] hmac;
@@ -42,10 +47,6 @@ public String hmacAsBase64String() {
         return Base64.encoder().encodeToString(hmac);
     }
 
-    public ByteBuffer hmacBuffer() {
-        return ByteBuffer.wrap(hmac);
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
 
b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
index 1d500d21eef..ffd2af3f1c2 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
@@ -16,11 +16,17 @@
  */
 package org.apache.kafka.common.security.token.delegation;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
 import java.util.ArrayList;
 import java.util.Collection;
 
+/**
+ * A class representing a delegation token details.
+ *
+ */
[email protected]
 public class TokenInformation {
 
     private KafkaPrincipal owner;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
 
b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
similarity index 95%
rename from 
clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
rename to 
clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
index adea210e678..c05b7350b41 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
@@ -15,12 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.common.security.token.delegation;
+package org.apache.kafka.common.security.token.delegation.internal;
 
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.internal.ScramCredentialUtils;
 import org.apache.kafka.common.security.scram.internal.ScramMechanism;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
 
 import java.util.Collection;
 import java.util.HashMap;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
 
b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
similarity index 94%
rename from 
clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
rename to 
clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
index 7490a3e91b1..294d7b1d3a0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.token.delegation;
+package org.apache.kafka.common.security.token.delegation.internal;
 
 import org.apache.kafka.common.security.scram.ScramCredentialCallback;
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c141a8acac9..0f5df38f743 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -276,6 +276,26 @@ public DeleteRecordsResult 
deleteRecords(Map<TopicPartition, RecordsToDelete> re
         }
     }
 
+    @Override
+    public CreateDelegationTokenResult 
createDelegationToken(CreateDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, 
RenewDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, 
ExpireDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DescribeDelegationTokenResult 
describeDelegationToken(DescribeDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
     @Override
     public CreateAclsResult createAcls(Collection<AclBinding> acls, 
CreateAclsOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index fab8e934d8e..68979a1e01f 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -44,7 +44,7 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 
 /**
  * Non-blocking EchoServer implementation that uses ChannelBuilder to create 
channels
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index bdbd1062685..c63cecdda28 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1223,7 +1223,7 @@ private CreateDelegationTokenResponse 
createCreateTokenResponse() {
     }
 
     private RenewDelegationTokenRequest createRenewTokenRequest() {
-        return new 
RenewDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), 
System.currentTimeMillis()).build();
+        return new RenewDelegationTokenRequest.Builder("test".getBytes(), 
System.currentTimeMillis()).build();
     }
 
     private RenewDelegationTokenResponse createRenewTokenResponse() {
@@ -1231,7 +1231,7 @@ private RenewDelegationTokenResponse 
createRenewTokenResponse() {
     }
 
     private ExpireDelegationTokenRequest createExpireTokenRequest() {
-        return new 
ExpireDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), 
System.currentTimeMillis()).build();
+        return new ExpireDelegationTokenRequest.Builder("test".getBytes(), 
System.currentTimeMillis()).build();
     }
 
     private ExpireDelegationTokenResponse createExpireTokenResponse() {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
index 3c4b82d7921..f6e43f9edcf 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
@@ -23,7 +23,7 @@
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 
 import org.junit.Before;
 import org.junit.Test;
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala 
b/core/src/main/scala/kafka/admin/AdminClient.scala
index c010ba0a4ac..bcc11fd4917 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -35,8 +35,6 @@ import 
org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.utils.LogContext
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
 import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
@@ -342,33 +340,6 @@ class AdminClient(val time: Time,
     ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), 
coordinator)
   }
 
-  def createToken(renewers: List[KafkaPrincipal], maxTimePeriodMs: Long = -1): 
(Errors, DelegationToken) = {
-    val responseBody = sendAnyNode(ApiKeys.CREATE_DELEGATION_TOKEN, new 
CreateDelegationTokenRequest.Builder(renewers.asJava, maxTimePeriodMs))
-    val response = responseBody.asInstanceOf[CreateDelegationTokenResponse]
-    val tokenInfo = new TokenInformation(response.tokenId, response.owner, 
renewers.asJava,
-      response.issueTimestamp, response.maxTimestamp, response.expiryTimestamp)
-    (response.error, new DelegationToken(tokenInfo, response.hmacBytes))
-  }
-
-  def renewToken(hmac: ByteBuffer, renewTimePeriod: Long = -1): (Errors, Long) 
= {
-    val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new 
RenewDelegationTokenRequest.Builder(hmac, renewTimePeriod))
-    val response = responseBody.asInstanceOf[RenewDelegationTokenResponse]
-    (response.error, response.expiryTimestamp)
-  }
-
-  def expireToken(hmac: ByteBuffer, expiryTimeStamp: Long = -1): (Errors, 
Long) = {
-    val responseBody = sendAnyNode(ApiKeys.EXPIRE_DELEGATION_TOKEN, new 
ExpireDelegationTokenRequest.Builder(hmac, expiryTimeStamp))
-    val response = responseBody.asInstanceOf[ExpireDelegationTokenResponse]
-    (response.error, response.expiryTimestamp)
-  }
-
-  def describeToken(owners: List[KafkaPrincipal]): (Errors, 
List[DelegationToken]) = {
-    val ownersList = if (owners == null) null else owners.asJava
-    val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new 
DescribeDelegationTokenRequest.Builder(ownersList))
-    val response = responseBody.asInstanceOf[DescribeDelegationTokenResponse]
-    (response.error, response.tokens().asScala.toList)
-  }
-
   def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
 
     def coordinatorLookup(group: String): Either[Node, Errors] = {
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala 
b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
index 6c5d1ce925d..0e6ea86034b 100644
--- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -17,12 +17,13 @@
 
 package kafka.admin
 
-import java.nio.ByteBuffer
+import java.text.SimpleDateFormat
+import java.util
 
-import joptsimple._
+import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser}
 import kafka.utils.{CommandLineUtils, Exit, Logging}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.clients.admin.{CreateDelegationTokenOptions, 
DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, 
RenewDelegationTokenOptions, AdminClient => JAdminClient}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.token.delegation.DelegationToken
 import org.apache.kafka.common.utils.{Base64, SecurityUtils, Utils}
@@ -71,19 +72,20 @@ object DelegationTokenCommand extends Logging {
     }
   }
 
-  def createToken(adminClient: AdminClient, opts: 
DelegationTokenCommandOptions) = {
-    val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt)
+  def createToken(adminClient: JAdminClient, opts: 
DelegationTokenCommandOptions): DelegationToken = {
+    val renewerPrincipals = getPrincipals(opts, 
opts.renewPrincipalsOpt).getOrElse(new util.LinkedList[KafkaPrincipal]())
     val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue
 
     println("Calling create token operation with renewers :" + 
renewerPrincipals +" , max-life-time-period :"+ maxLifeTimeMs)
-    val response = adminClient.createToken(renewerPrincipals, maxLifeTimeMs)
-    response  match {
-        case (Errors.NONE, token) => println("Created delegation token with 
tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
-        case (e, _) =>  throw new AdminOperationException(e.message)
-    }
+    val createDelegationTokenOptions = new 
CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals)
+    val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+    val token = createResult.delegationToken().get()
+    println("Created delegation token with tokenId : 
%s".format(token.tokenInfo.tokenId)); printToken(List(token))
+    token
   }
 
   def printToken(tokens: List[DelegationToken]): Unit = {
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
     print("\n%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format("TOKENID", 
"HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
     for (token <- tokens) {
       val tokenInfo = token.tokenInfo
@@ -92,56 +94,59 @@ object DelegationTokenCommand extends Logging {
         token.hmacAsBase64String,
         tokenInfo.owner,
         tokenInfo.renewersAsString,
-        tokenInfo.issueTimestamp,
-        tokenInfo.expiryTimestamp,
-        tokenInfo.maxTimestamp))
+        dateFormat.format(tokenInfo.issueTimestamp),
+        dateFormat.format(tokenInfo.expiryTimestamp),
+        dateFormat.format(tokenInfo.maxTimestamp)))
       println()
     }
   }
 
-  private def getPrincipals(opts: DelegationTokenCommandOptions, 
principalOptionSpec: ArgumentAcceptingOptionSpec[String]): List[KafkaPrincipal] 
= {
+  private def getPrincipals(opts: DelegationTokenCommandOptions, 
principalOptionSpec: ArgumentAcceptingOptionSpec[String]): 
Option[util.List[KafkaPrincipal]] = {
     if (opts.options.has(principalOptionSpec))
-      opts.options.valuesOf(principalOptionSpec).asScala.map(s => 
SecurityUtils.parseKafkaPrincipal(s.trim)).toList
+      Some(opts.options.valuesOf(principalOptionSpec).asScala.map(s => 
SecurityUtils.parseKafkaPrincipal(s.trim)).toList.asJava)
     else
-      List.empty[KafkaPrincipal]
+      None
   }
 
-  def renewToken(adminClient: AdminClient, opts: 
DelegationTokenCommandOptions) = {
+  def renewToken(adminClient: JAdminClient, opts: 
DelegationTokenCommandOptions): Long = {
     val hmac = opts.options.valueOf(opts.hmacOpt)
     val renewTimePeriodMs = 
opts.options.valueOf(opts.renewTimePeriodOpt).longValue()
     println("Calling renew token operation with hmac :" + hmac +" , 
renew-time-period :"+ renewTimePeriodMs)
-    val response = 
adminClient.renewToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), 
renewTimePeriodMs)
-    response match {
-      case (Errors.NONE, expiryTimeStamp) => println("Completed renew 
operation. New expiry timestamp : %s".format(expiryTimeStamp))
-      case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
-    }
+    val renewResult = 
adminClient.renewDelegationToken(Base64.decoder.decode(hmac), new 
RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs))
+    val expiryTimeStamp = renewResult.expiryTimestamp().get()
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
+    println("Completed renew operation. New expiry date : 
%s".format(dateFormat.format(expiryTimeStamp)))
+    expiryTimeStamp
   }
 
-  def expireToken(adminClient: AdminClient, opts: 
DelegationTokenCommandOptions) = {
+  def expireToken(adminClient: JAdminClient, opts: 
DelegationTokenCommandOptions): Long = {
     val hmac = opts.options.valueOf(opts.hmacOpt)
     val expiryTimePeriodMs = 
opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
     println("Calling expire token operation with hmac :" + hmac +" , 
expire-time-period : "+ expiryTimePeriodMs)
-    val response = 
adminClient.expireToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), 
expiryTimePeriodMs)
-    response match {
-      case (Errors.NONE, expiryTimeStamp) => println("Completed expire 
operation. New expiry timestamp : %s".format(expiryTimeStamp))
-      case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
-    }
+    val expireResult = 
adminClient.expireDelegationToken(Base64.decoder.decode(hmac), new 
ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
+    val expiryTimeStamp = expireResult.expiryTimestamp().get()
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
+    println("Completed expire operation. New expiry date : 
%s".format(dateFormat.format(expiryTimeStamp)))
+    expiryTimeStamp
   }
 
-  def describeToken(adminClient: AdminClient, opts: 
DelegationTokenCommandOptions) = {
+  def describeToken(adminClient: JAdminClient, opts: 
DelegationTokenCommandOptions): List[DelegationToken] = {
     val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt)
-    println("Calling describe token operation for owners :" + ownerPrincipals)
-    val response = adminClient.describeToken(ownerPrincipals)
-    response  match {
-      case (Errors.NONE, tokens) => println("Total Number of tokens : 
%s".format(tokens.size)); printToken(tokens)
-      case (e, tokens) => throw new AdminOperationException(e.message)
-    }
+    if (ownerPrincipals.isEmpty)
+      println("Calling describe token operation for current user.")
+    else
+      println("Calling describe token operation for owners :" + 
ownerPrincipals.get)
+
+    val describeResult = adminClient.describeDelegationToken(new 
DescribeDelegationTokenOptions().owners(ownerPrincipals.orNull))
+    val tokens = describeResult.delegationTokens().get().asScala.toList
+    println("Total number of tokens : %s".format(tokens.size)); 
printToken(tokens)
+    tokens
   }
 
-  private def createAdminClient(opts: DelegationTokenCommandOptions): 
AdminClient = {
+  private def createAdminClient(opts: DelegationTokenCommandOptions): 
JAdminClient = {
     val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
-    AdminClient.create(props)
+    JAdminClient.create(props)
   }
 
   class DelegationTokenCommandOptions(args: Array[String]) {
@@ -157,10 +162,11 @@ object DelegationTokenCommand extends Logging {
       .withRequiredArg
       .ofType(classOf[String])
 
-    val createOpt = parser.accepts("create", "Create a new delegation token.")
-    val renewOpt = parser.accepts("renew",  "Renew delegation token.")
-    val expiryOpt = parser.accepts("expire", "Expire delegation token.")
-    val describeOpt = parser.accepts("describe", "describe delegation tokens.")
+    val createOpt = parser.accepts("create", "Create a new delegation token. 
Use --renewer-principal option to pass renewers principals.")
+    val renewOpt = parser.accepts("renew",  "Renew delegation token. Use 
--renew-time-period option to set renew time period.")
+    val expiryOpt = parser.accepts("expire", "Expire delegation token. Use 
--expiry-time-period option to expire the token.")
+    val describeOpt = parser.accepts("describe", "Describe delegation tokens 
for the given principals. Use --owner-principal to pass owner/renewer 
principals." +
+      " If --owner-principal option is not supplied, all the user owned tokens 
and tokens where user have Describe permission will be returned.")
 
     val ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a 
kafka principal. It is should be in principalType:name format.")
       .withOptionalArg()
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala 
b/core/src/main/scala/kafka/security/CredentialProvider.scala
index 6f9c2527735..f20808791a2 100644
--- a/core/src/main/scala/kafka/security/CredentialProvider.scala
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.config.ConfigDef
 import org.apache.kafka.common.config.ConfigDef._
 import org.apache.kafka.common.security.scram.internal.{ScramCredentialUtils, 
ScramMechanism}
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
 
 class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: 
DelegationTokenCache) {
 
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala 
b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index 4a947a17fcf..62a5e20322b 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -31,7 +31,8 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internal.{ScramFormatter, 
ScramMechanism}
 import org.apache.kafka.common.security.scram.ScramCredential
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
DelegationTokenCache, TokenInformation}
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
 import org.apache.kafka.common.utils.{Base64, Sanitizer, SecurityUtils, Time}
 
 import scala.collection.JavaConverters._
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 71056888c73..a0d2c799e6a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, 
ControlledShutdownResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
diff --git 
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index 27a6d1127d5..56a3b8a3f39 100644
--- 
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -18,10 +18,9 @@ package kafka.api
 
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
@@ -83,9 +82,8 @@ class DelegationTokenEndToEndAuthorizationTest extends 
EndToEndAuthorizationTest
     val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
     config.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
 
-    val adminClient = AdminClient.create(config.asScala.toMap)
-    val (error, token)  = adminClient.createToken(List())
-
+    val adminClient = AdminClient.create(config)
+    val token = adminClient.createDelegationToken().delegationToken().get()
     //wait for token to reach all the brokers
     TestUtils.waitUntilTrue(() => servers.forall(server => 
!server.tokenCache.tokens().isEmpty),
       "Timed out waiting for token to propagate to all servers")
diff --git 
a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
new file mode 100644
index 00000000000..6ae8f5e83ac
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
@@ -0,0 +1,147 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.admin
+
+import java.util
+
+import kafka.admin.DelegationTokenCommand.DelegationTokenCommandOptions
+import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.ExecutionException
+
+class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List("PLAIN")
+  protected override val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
+  protected override val clientSaslProperties = 
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+  var adminClient: org.apache.kafka.clients.admin.AdminClient = null
+
+  override def numBrokers = 1
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), KafkaSasl, 
JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+  }
+
+  override def generateConfigs = {
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+      enableControlledShutdown = false, enableDeleteTopic = true,
+      interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, 
enableToken = true)
+    props.foreach(propertyOverrides)
+    props.map(KafkaConfig.fromProps)
+  }
+
+  private def createAdminConfig():util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, 
clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => 
config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
+  @Test
+  def testDelegationTokenRequests(): Unit = {
+    adminClient = 
org.apache.kafka.clients.admin.AdminClient.create(createAdminConfig)
+    val renewer1 = "User:renewer1"
+    val renewer2 = "User:renewer2"
+
+    // create token1 with renewer1
+    val tokenCreated = DelegationTokenCommand.createToken(adminClient, 
getCreateOpts(List(renewer1)))
+
+    var tokens = DelegationTokenCommand.describeToken(adminClient, 
getDescribeOpts(List()))
+    assertTrue(tokens.size == 1)
+    val token1 = tokens.head
+    assertEquals(token1, tokenCreated)
+
+    // create token2 with renewer2
+    val token2 = DelegationTokenCommand.createToken(adminClient, 
getCreateOpts(List(renewer2)))
+
+    tokens = DelegationTokenCommand.describeToken(adminClient, 
getDescribeOpts(List()))
+    assertTrue(tokens.size == 2)
+    assertEquals(Set(token1, token2), tokens.toSet)
+
+    //get tokens for renewer2
+    tokens = DelegationTokenCommand.describeToken(adminClient, 
getDescribeOpts(List(renewer2)))
+    assertTrue(tokens.size == 1)
+    assertEquals(Set(token2), tokens.toSet)
+
+    //test renewing tokens
+    val expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, 
getRenewOpts(token1.hmacAsBase64String()))
+    val renewedToken = DelegationTokenCommand.describeToken(adminClient, 
getDescribeOpts(List(renewer1))).head
+    assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp())
+
+    //test expire tokens
+    DelegationTokenCommand.expireToken(adminClient, 
getExpireOpts(token1.hmacAsBase64String()))
+    DelegationTokenCommand.expireToken(adminClient, 
getExpireOpts(token2.hmacAsBase64String()))
+
+    tokens = DelegationTokenCommand.describeToken(adminClient, 
getDescribeOpts(List()))
+    assertTrue(tokens.size == 0)
+
+    //create token with invalid renewer principal type
+    
intercept[ExecutionException](DelegationTokenCommand.createToken(adminClient, 
getCreateOpts(List("Group:Renewer3"))))
+
+    // try describing tokens for unknown owner
+    assertTrue(DelegationTokenCommand.describeToken(adminClient, 
getDescribeOpts(List("User:Unknown"))).isEmpty)
+  }
+
+  private def getCreateOpts(renewers: List[String]): 
DelegationTokenCommandOptions = {
+    val opts = ListBuffer("--bootstrap-server", brokerList, 
"--max-life-time-period", "-1",
+      "--command-config", "testfile", "--create")
+    renewers.foreach(renewer => opts ++= ListBuffer("--renewer-principal", 
renewer))
+    new DelegationTokenCommandOptions(opts.toArray)
+  }
+
+  private def getDescribeOpts(owners: List[String]): 
DelegationTokenCommandOptions = {
+    val opts = ListBuffer("--bootstrap-server", brokerList, 
"--command-config", "testfile", "--describe")
+    owners.foreach(owner => opts ++= ListBuffer("--owner-principal", owner))
+    new DelegationTokenCommandOptions(opts.toArray)
+  }
+
+  private def getRenewOpts(hmac: String): DelegationTokenCommandOptions = {
+    val opts = Array("--bootstrap-server", brokerList, "--command-config", 
"testfile", "--renew",
+      "--renew-time-period", "-1",
+      "--hmac", hmac)
+    new DelegationTokenCommandOptions(opts)
+  }
+
+  private def getExpireOpts(hmac: String): DelegationTokenCommandOptions = {
+    val opts = Array("--bootstrap-server", brokerList, "--command-config", 
"testfile", "--expire",
+      "--expiry-time-period", "-1",
+      "--hmac", hmac)
+    new DelegationTokenCommandOptions(opts)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (adminClient != null)
+      adminClient.close()
+    super.tearDown()
+    closeSasl()
+  }
+}
diff --git 
a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
 
b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index b8388b4cb49..6093622a188 100644
--- 
a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -30,7 +30,8 @@ import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
DelegationTokenCache, TokenInformation}
+import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
 import org.apache.kafka.common.utils.{MockTime, SecurityUtils}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
diff --git 
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
 
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index 4c42dd27724..3d4be531985 100644
--- 
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -19,14 +19,13 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
-import org.junit.Assert._
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
   var adminClient: AdminClient = null
@@ -49,21 +48,19 @@ class DelegationTokenRequestsOnPlainTextTest extends 
BaseRequestTest {
 
   @Test
   def testDelegationTokenRequests(): Unit = {
-    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
+    adminClient = AdminClient.create(createAdminConfig)
 
-    val createResponse = adminClient.createToken(List())
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, 
createResponse._1)
+    val createResult = adminClient.createDelegationToken()
+    
intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
 
-    val describeResponse = adminClient.describeToken(List())
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, 
describeResponse._1)
+    val describeResult = adminClient.describeDelegationToken()
+    
intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
 
-    //test renewing tokens
-    val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, renewResponse._1)
+    val renewResult = adminClient.renewDelegationToken("".getBytes())
+    
intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
 
-    //test expire tokens tokens
-    val expireResponse = 
adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, 
expireResponse._1)
+    val expireResult = adminClient.expireDelegationToken("".getBytes())
+    
intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
   }
 
 
diff --git 
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala 
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index 55bf5fd022c..a00275084fb 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -18,17 +18,17 @@ package kafka.server
 
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, 
CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
+import org.apache.kafka.common.errors.InvalidPrincipalTypeException
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.SecurityUtils
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -46,15 +46,6 @@ class DelegationTokenRequestsTest extends BaseRequestTest 
with SaslSetup {
     super.setUp()
   }
 
-  def createAdminConfig():util.Map[String, Object] = {
-    val config = new util.HashMap[String, Object]
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    val securityProps: util.Map[Object, Object] =
-      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, 
clientSaslProperties)
-    securityProps.asScala.foreach { case (key, value) => 
config.put(key.asInstanceOf[String], value) }
-    config
-  }
-
   override def generateConfigs = {
     val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
       enableControlledShutdown = false, enableDeleteTopic = true,
@@ -64,46 +55,73 @@ class DelegationTokenRequestsTest extends BaseRequestTest 
with SaslSetup {
     props.map(KafkaConfig.fromProps)
   }
 
+  private def createAdminConfig():util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, 
clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => 
config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
   @Test
   def testDelegationTokenRequests(): Unit = {
-    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
-
-    // test creating token
-    val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + 
JaasTestUtils.KafkaPlainUser))
-    val tokenResult1 = adminClient.createToken(renewer1)
-    assertEquals(Errors.NONE, tokenResult1._1)
-    var token1 = adminClient.describeToken(null)._2.head
-    assertEquals(token1, tokenResult1._2)
+    adminClient = AdminClient.create(createAdminConfig)
+
+    // create token1 with renewer1
+    val renewer1 = 
List(SecurityUtils.parseKafkaPrincipal("User:renewer1")).asJava
+    val createResult1 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer1))
+    val tokenCreated = createResult1.delegationToken().get()
+
+    //test describe token
+    var tokens = adminClient.describeDelegationToken().delegationTokens().get()
+    assertTrue(tokens.size() == 1)
+    var token1 = tokens.get(0)
+    assertEquals(token1, tokenCreated)
+
+    // create token2 with renewer2
+    val renewer2 = 
List(SecurityUtils.parseKafkaPrincipal("User:renewer2")).asJava
+    val createResult2 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer2))
+    val token2 = createResult2.delegationToken().get()
+
+    //get all tokens
+    tokens = adminClient.describeDelegationToken().delegationTokens().get()
+    assertTrue(tokens.size() == 2)
+    assertEquals(Set(token1, token2), tokens.asScala.toSet)
+
+    //get tokens for renewer2
+    tokens = adminClient.describeDelegationToken(new 
DescribeDelegationTokenOptions().owners(renewer2)).delegationTokens().get()
+    assertTrue(tokens.size() == 1)
+    assertEquals(Set(token2), tokens.asScala.toSet)
 
     //test renewing tokens
-    val renewResponse = adminClient.renewToken(token1.hmacBuffer())
-    assertEquals(Errors.NONE, renewResponse._1)
-
-    token1 = adminClient.describeToken(null)._2.head
-    assertEquals(renewResponse._2, token1.tokenInfo().expiryTimestamp())
+    val renewResult = adminClient.renewDelegationToken(token1.hmac())
+    var expiryTimestamp = renewResult.expiryTimestamp().get()
 
-    //test describe tokens
-    val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:Renewer1"))
-    val tokenResult2 = adminClient.createToken(renewer2)
-    assertEquals(Errors.NONE, tokenResult2._1)
-    val token2 = tokenResult2._2
+    val describeResult = adminClient.describeDelegationToken()
+    val tokenId = token1.tokenInfo().tokenId()
+    token1 = describeResult.delegationTokens().get().asScala.filter(dt => 
dt.tokenInfo().tokenId() == tokenId).head
+    assertEquals(expiryTimestamp, token1.tokenInfo().expiryTimestamp())
 
-    assertTrue(adminClient.describeToken(null)._2.size == 2)
+    //test expire tokens
+    val expireResult1 = adminClient.expireDelegationToken(token1.hmac())
+    expiryTimestamp = expireResult1.expiryTimestamp().get()
 
-    //test expire tokens tokens
-    val expireResponse1 = adminClient.expireToken(token1.hmacBuffer())
-    assertEquals(Errors.NONE, expireResponse1._1)
+    val expireResult2 = adminClient.expireDelegationToken(token2.hmac())
+    expiryTimestamp = expireResult2.expiryTimestamp().get()
 
-    val expireResponse2 = adminClient.expireToken(token2.hmacBuffer())
-    assertEquals(Errors.NONE, expireResponse2._1)
-
-    assertTrue(adminClient.describeToken(null)._2.size == 0)
+    tokens = adminClient.describeDelegationToken().delegationTokens().get()
+    assertTrue(tokens.size == 0)
 
     //create token with invalid principal type
-    val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer1"))
-    val tokenResult3 = adminClient.createToken(renewer3)
-    assertEquals(Errors.INVALID_PRINCIPAL_TYPE, tokenResult3._1)
-
+    val renewer3 = 
List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava
+    val createResult3 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer3))
+    
intercept[ExecutionException](createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException]
+
+    // try describing tokens for unknown owner
+    val unknownOwner = 
List(SecurityUtils.parseKafkaPrincipal("User:Unknown")).asJava
+    tokens = adminClient.describeDelegationToken(new 
DescribeDelegationTokenOptions().owners(unknownOwner)).delegationTokens().get()
+    assertTrue(tokens.isEmpty)
   }
 
   @After
diff --git 
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
 
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index 0561cacb8f8..8f8842bbe97 100644
--- 
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -19,17 +19,15 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.SecurityUtils
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.common.errors.DelegationTokenDisabledException
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsWithDisableTokenFeatureTest extends 
BaseRequestTest with SaslSetup {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -58,23 +56,19 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest 
extends BaseRequestTest
 
   @Test
   def testDelegationTokenRequests(): Unit = {
-    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
-
-    val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + 
JaasTestUtils.KafkaPlainUser))
-    val createResponse = adminClient.createToken(renewer1)
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createResponse._1)
+    adminClient = AdminClient.create(createAdminConfig)
 
-    val describeResponse = adminClient.describeToken(List())
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, describeResponse._1)
+    val createResult = adminClient.createDelegationToken()
+    
intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
 
-    //test renewing tokens
-    val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, renewResponse._1)
+    val describeResult = adminClient.describeDelegationToken()
+    
intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
 
-    //test expire tokens tokens
-    val expireResponse = 
adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, expireResponse._1)
+    val renewResult = adminClient.renewDelegationToken("".getBytes())
+    
intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
 
+    val expireResult = adminClient.expireDelegationToken("".getBytes())
+    
intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
   }
 
   @After
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 2a7d6d400d5..ed85415eacc 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -315,13 +315,13 @@ class RequestQuotaTest extends BaseRequestTest {
           new 
CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")),
 1000)
 
         case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
-          new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 
1000)
+          new ExpireDelegationTokenRequest.Builder("".getBytes, 1000)
 
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
           new 
DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
 
         case ApiKeys.RENEW_DELEGATION_TOKEN =>
-          new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 
1000)
+          new RenewDelegationTokenRequest.Builder("".getBytes, 1000)
 
         case ApiKeys.DELETE_GROUPS =>
           new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add Delegation Token Operations to KafkaAdminClient
> ---------------------------------------------------
>
>                 Key: KAFKA-6447
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6447
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Manikumar
>            Assignee: Manikumar
>            Priority: Major
>
> This JIRA is about adding delegation token operations to the new Admin Client 
> API.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-249%3A+Add+Delegation+Token+Operations+to+KafkaAdminClient



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to