This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 47918f2 KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427) 47918f2 is described below commit 47918f2d79e907f6a6da599ab82a97c169722229 Author: Manikumar Reddy O <manikumar.re...@gmail.com> AuthorDate: Wed Apr 11 23:18:04 2018 +0530 KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427) Reviewers: Jun Rao <jun...@gmail.com> --- build.gradle | 1 + checkstyle/suppressions.xml | 2 +- .../apache/kafka/clients/admin/AdminClient.java | 154 +++++++++++++++++++++ .../admin/CreateDelegationTokenOptions.java | 53 +++++++ .../clients/admin/CreateDelegationTokenResult.java | 43 ++++++ .../admin/DescribeDelegationTokenOptions.java | 48 +++++++ .../admin/DescribeDelegationTokenResult.java | 45 ++++++ .../admin/ExpireDelegationTokenOptions.java} | 26 ++-- .../admin/ExpireDelegationTokenResult.java} | 29 ++-- .../kafka/clients/admin/KafkaAdminClient.java | 137 ++++++++++++++++++ .../admin/RenewDelegationTokenOptions.java} | 26 ++-- .../admin/RenewDelegationTokenResult.java} | 29 ++-- .../kafka/common/network/ChannelBuilders.java | 2 +- .../kafka/common/network/SaslChannelBuilder.java | 2 +- .../requests/DescribeDelegationTokenResponse.java | 4 + .../requests/ExpireDelegationTokenRequest.java | 4 +- .../requests/ExpireDelegationTokenResponse.java | 4 + .../requests/RenewDelegationTokenRequest.java | 4 +- .../requests/RenewDelegationTokenResponse.java | 4 + .../security/scram/internal/ScramSaslServer.java | 2 +- .../scram/internal/ScramServerCallbackHandler.java | 4 +- .../security/token/delegation/DelegationToken.java | 11 +- .../token/delegation/TokenInformation.java | 6 + .../{ => internal}/DelegationTokenCache.java | 4 +- .../DelegationTokenCredentialCallback.java | 2 +- .../kafka/clients/admin/MockAdminClient.java | 20 +++ .../apache/kafka/common/network/NioEchoServer.java | 2 +- .../kafka/common/requests/RequestResponseTest.java | 4 +- .../scram/internal/ScramSaslServerTest.java | 2 +- core/src/main/scala/kafka/admin/AdminClient.scala | 29 ---- .../scala/kafka/admin/DelegationTokenCommand.scala | 88 ++++++------ .../scala/kafka/security/CredentialProvider.scala | 2 +- .../kafka/server/DelegationTokenManager.scala | 3 +- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- .../DelegationTokenEndToEndAuthorizationTest.scala | 8 +- .../kafka/admin/DelegationTokenCommandTest.scala | 147 ++++++++++++++++++++ .../delegation/DelegationTokenManagerTest.scala | 3 +- .../DelegationTokenRequestsOnPlainTextTest.scala | 27 ++-- .../kafka/server/DelegationTokenRequestsTest.scala | 102 ++++++++------ ...nTokenRequestsWithDisableTokenFeatureTest.scala | 32 ++--- .../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +- 41 files changed, 907 insertions(+), 214 deletions(-) diff --git a/build.gradle b/build.gradle index f836980..69f560e 100644 --- a/build.gradle +++ b/build.gradle @@ -858,6 +858,7 @@ project(':clients') { include "**/org/apache/kafka/common/config/*" include "**/org/apache/kafka/common/security/auth/*" include "**/org/apache/kafka/server/policy/*" + include "**/org/apache/kafka/common/security/token/delegation/*" } } diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 0fec810..2767132 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -10,7 +10,7 @@ <!-- Clients --> <suppress checks="ClassFanOutComplexity" - files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/> + files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|AdminClient).java"/> <suppress checks="ClassFanOutComplexity" files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/> <suppress checks="ClassFanOutComplexity" diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 897e127..53b77ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -535,4 +535,158 @@ public abstract class AdminClient implements AutoCloseable { */ public abstract DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete, DeleteRecordsOptions options); + + /** + * <p>Create a Delegation Token.</p> + * + * <p>This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options. + * See the overload for more details.</p> + * + * @return The CreateDelegationTokenResult. + */ + public CreateDelegationTokenResult createDelegationToken() { + return createDelegationToken(new CreateDelegationTokenOptions()); + } + + + /** + * <p>Create a Delegation Token.</p> + * + * <p>This operation is supported by brokers with version 1.1.0 or higher.</p> + * + * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the + * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@code CreateDelegationTokenResult}</p> + * <ul> + * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException} + * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li> + * <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException} + * if the renewers principal type is not supported.</li> + * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException} + * if the delegation token feature is disabled.</li> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li> + * </ul> + * + * @param options The options to use when creating delegation token. + * @return The DeleteRecordsResult. + */ + public abstract CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options); + + + /** + * <p>Renew a Delegation Token.</p> + * + * <p>This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options. + * See the overload for more details.</p> + * + * + * @param hmac HMAC of the Delegation token + * @return The RenewDelegationTokenResult. + */ + public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) { + return renewDelegationToken(hmac, new RenewDelegationTokenOptions()); + } + + /** + * <p> Renew a Delegation Token.</p> + * + * <p>This operation is supported by brokers with version 1.1.0 or higher.</p> + * + * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the + * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code RenewDelegationTokenResult}</p> + * <ul> + * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException} + * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li> + * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException} + * if the delegation token feature is disabled.</li> + * <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException} + * if the delegation token is not found on server.</li> + * <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException} + * if the authenticated user is not owner/renewer of the token.</li> + * <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException} + * if the delegation token is expired.</li> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li> + * </ul> + * + * @param hmac HMAC of the Delegation token + * @param options The options to use when renewing delegation token. + * @return The RenewDelegationTokenResult. + */ + public abstract RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options); + + /** + * <p>Expire a Delegation Token.</p> + * + * <p>This is a convenience method for {@link #expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default options. + * This will expire the token immediately. See the overload for more details.</p> + * + * @param hmac HMAC of the Delegation token + * @return The ExpireDelegationTokenResult. + */ + public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) { + return expireDelegationToken(hmac, new ExpireDelegationTokenOptions()); + } + + /** + * <p>Expire a Delegation Token.</p> + * + * <p>This operation is supported by brokers with version 1.1.0 or higher.</p> + * + * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the + * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code ExpireDelegationTokenResult}</p> + * <ul> + * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException} + * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li> + * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException} + * if the delegation token feature is disabled.</li> + * <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException} + * if the delegation token is not found on server.</li> + * <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException} + * if the authenticated user is not owner/renewer of the requested token.</li> + * <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException} + * if the delegation token is expired.</li> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li> + * </ul> + * + * @param hmac HMAC of the Delegation token + * @param options The options to use when expiring delegation token. + * @return The ExpireDelegationTokenResult. + */ + public abstract ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options); + + /** + *<p>Describe the Delegation Tokens.</p> + * + * <p>This is a convenience method for {@link #describeDelegationToken(DescribeDelegationTokenOptions)} with default options. + * This will return all the user owned tokens and tokens where user have Describe permission. See the overload for more details.</p> + * + * @return The DescribeDelegationTokenResult. + */ + public DescribeDelegationTokenResult describeDelegationToken() { + return describeDelegationToken(new DescribeDelegationTokenOptions()); + } + + /** + * <p>Describe the Delegation Tokens.</p> + * + * <p>This operation is supported by brokers with version 1.1.0 or higher.</p> + * + * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the + * {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@code DescribeDelegationTokenResult}</p> + * <ul> + * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException} + * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li> + * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException} + * if the delegation token feature is disabled.</li> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li> + * </ul> + * + * @param options The options to use when describing delegation tokens. + * @return The DescribeDelegationTokenResult. + */ + public abstract DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options); + } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java new file mode 100644 index 0000000..1b77b94 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.security.auth.KafkaPrincipal; + +/** + * Options for {@link AdminClient#createDelegationToken(CreateDelegationTokenOptions)}. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class CreateDelegationTokenOptions extends AbstractOptions<CreateDelegationTokenOptions> { + private long maxLifeTimeMs = -1; + private List<KafkaPrincipal> renewers = new LinkedList<>(); + + public CreateDelegationTokenOptions renewers(List<KafkaPrincipal> renewers) { + this.renewers = renewers; + return this; + } + + public List<KafkaPrincipal> renewers() { + return renewers; + } + + public CreateDelegationTokenOptions maxlifeTimeMs(long maxLifeTimeMs) { + this.maxLifeTimeMs = maxLifeTimeMs; + return this; + } + + public long maxlifeTimeMs() { + return maxLifeTimeMs; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java new file mode 100644 index 0000000..043cbe8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.security.token.delegation.DelegationToken; + +/** + * The result of the {@link KafkaAdminClient#createDelegationToken(CreateDelegationTokenOptions)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class CreateDelegationTokenResult { + private final KafkaFuture<DelegationToken> delegationToken; + + CreateDelegationTokenResult(KafkaFuture<DelegationToken> delegationToken) { + this.delegationToken = delegationToken; + } + + /** + * Returns a future which yields a delegation token + */ + public KafkaFuture<DelegationToken> delegationToken() { + return delegationToken; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java new file mode 100644 index 0000000..60b9935 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.List; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.security.auth.KafkaPrincipal; + +/** + * Options for {@link AdminClient#describeDelegationToken(DescribeDelegationTokenOptions)}. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class DescribeDelegationTokenOptions extends AbstractOptions<DescribeDelegationTokenOptions> { + private List<KafkaPrincipal> owners; + + /** + * if owners is null, all the user owned tokens and tokens where user have Describe permission + * will be returned. + * @param owners + * @return this instance + */ + public DescribeDelegationTokenOptions owners(List<KafkaPrincipal> owners) { + this.owners = owners; + return this; + } + + public List<KafkaPrincipal> owners() { + return owners; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java new file mode 100644 index 0000000..7a9d4b9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.List; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.security.token.delegation.DelegationToken; + +/** + * The result of the {@link KafkaAdminClient#describeDelegationToken(DescribeDelegationTokenOptions)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class DescribeDelegationTokenResult { + private final KafkaFuture<List<DelegationToken>> delegationTokens; + + DescribeDelegationTokenResult(KafkaFuture<List<DelegationToken>> delegationTokens) { + this.delegationTokens = delegationTokens; + } + + /** + * Returns a future which yields list of delegation tokens + */ + public KafkaFuture<List<DelegationToken>> delegationTokens() { + return delegationTokens; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java similarity index 54% copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java copy to clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java index 7490a3e..138cd4e 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java @@ -14,18 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.security.token.delegation; -import org.apache.kafka.common.security.scram.ScramCredentialCallback; +package org.apache.kafka.clients.admin; -public class DelegationTokenCredentialCallback extends ScramCredentialCallback { - private String tokenOwner; +import org.apache.kafka.common.annotation.InterfaceStability; - public void tokenOwner(String tokenOwner) { - this.tokenOwner = tokenOwner; +/** + * Options for {@link AdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)}. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ExpireDelegationTokenOptions extends AbstractOptions<ExpireDelegationTokenOptions> { + private long expiryTimePeriodMs = -1L; + + public ExpireDelegationTokenOptions expiryTimePeriodMs(long expiryTimePeriodMs) { + this.expiryTimePeriodMs = expiryTimePeriodMs; + return this; } - public String tokenOwner() { - return tokenOwner; + public long expiryTimePeriodMs() { + return expiryTimePeriodMs; } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java similarity index 52% copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java copy to clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java index 7490a3e..41782bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java @@ -14,18 +14,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.security.token.delegation; -import org.apache.kafka.common.security.scram.ScramCredentialCallback; +package org.apache.kafka.clients.admin; -public class DelegationTokenCredentialCallback extends ScramCredentialCallback { - private String tokenOwner; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; - public void tokenOwner(String tokenOwner) { - this.tokenOwner = tokenOwner; +/** + * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ExpireDelegationTokenResult { + private final KafkaFuture<Long> expiryTimestamp; + + ExpireDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) { + this.expiryTimestamp = expiryTimestamp; } - public String tokenOwner() { - return tokenOwner; + /** + * Returns a future which yields expiry timestamp + */ + public KafkaFuture<Long> expiryTimestamp() { + return expiryTimestamp; } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 5118953..3ac0e28 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -69,6 +69,8 @@ import org.apache.kafka.common.requests.CreateAclsRequest; import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation; import org.apache.kafka.common.requests.CreateAclsResponse; import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; +import org.apache.kafka.common.requests.CreateDelegationTokenRequest; +import org.apache.kafka.common.requests.CreateDelegationTokenResponse; import org.apache.kafka.common.requests.CreatePartitionsRequest; import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; @@ -85,12 +87,20 @@ import org.apache.kafka.common.requests.DescribeAclsRequest; import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; +import org.apache.kafka.common.requests.DescribeDelegationTokenRequest; +import org.apache.kafka.common.requests.DescribeDelegationTokenResponse; import org.apache.kafka.common.requests.DescribeLogDirsRequest; import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; +import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RenewDelegationTokenRequest; +import org.apache.kafka.common.requests.RenewDelegationTokenResponse; import org.apache.kafka.common.requests.Resource; import org.apache.kafka.common.requests.ResourceType; +import org.apache.kafka.common.security.token.delegation.DelegationToken; +import org.apache.kafka.common.security.token.delegation.TokenInformation; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; @@ -2072,4 +2082,131 @@ public class KafkaAdminClient extends AdminClient { return new DeleteRecordsResult(new HashMap<TopicPartition, KafkaFuture<DeletedRecords>>(futures)); } + + @Override + public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) { + final KafkaFutureImpl<DelegationToken> delegationTokenFuture = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new CreateDelegationTokenRequest.Builder(options.renewers(), options.maxlifeTimeMs()); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + CreateDelegationTokenResponse response = (CreateDelegationTokenResponse) abstractResponse; + if (response.hasError()) { + delegationTokenFuture.completeExceptionally(response.error().exception()); + } else { + TokenInformation tokenInfo = new TokenInformation(response.tokenId(), response.owner(), + options.renewers(), response.issueTimestamp(), response.maxTimestamp(), response.expiryTimestamp()); + DelegationToken token = new DelegationToken(tokenInfo, response.hmacBytes()); + delegationTokenFuture.complete(token); + } + } + + @Override + void handleFailure(Throwable throwable) { + delegationTokenFuture.completeExceptionally(throwable); + } + }, now); + + return new CreateDelegationTokenResult(delegationTokenFuture); + } + + @Override + public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final RenewDelegationTokenOptions options) { + final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new RenewDelegationTokenRequest.Builder(hmac, options.renewTimePeriodMs()); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + RenewDelegationTokenResponse response = (RenewDelegationTokenResponse) abstractResponse; + if (response.hasError()) { + expiryTimeFuture.completeExceptionally(response.error().exception()); + } else { + expiryTimeFuture.complete(response.expiryTimestamp()); + } + } + + @Override + void handleFailure(Throwable throwable) { + expiryTimeFuture.completeExceptionally(throwable); + } + }, now); + + return new RenewDelegationTokenResult(expiryTimeFuture); + } + + @Override + public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) { + final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new ExpireDelegationTokenRequest.Builder(hmac, options.expiryTimePeriodMs()); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + ExpireDelegationTokenResponse response = (ExpireDelegationTokenResponse) abstractResponse; + if (response.hasError()) { + expiryTimeFuture.completeExceptionally(response.error().exception()); + } else { + expiryTimeFuture.complete(response.expiryTimestamp()); + } + } + + @Override + void handleFailure(Throwable throwable) { + expiryTimeFuture.completeExceptionally(throwable); + } + }, now); + + return new ExpireDelegationTokenResult(expiryTimeFuture); + } + + @Override + public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) { + final KafkaFutureImpl<List<DelegationToken>> tokensFuture = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new DescribeDelegationTokenRequest.Builder(options.owners()); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeDelegationTokenResponse response = (DescribeDelegationTokenResponse) abstractResponse; + if (response.hasError()) { + tokensFuture.completeExceptionally(response.error().exception()); + } else { + tokensFuture.complete(response.tokens()); + } + } + + @Override + void handleFailure(Throwable throwable) { + tokensFuture.completeExceptionally(throwable); + } + }, now); + + return new DescribeDelegationTokenResult(tokensFuture); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java similarity index 54% copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java copy to clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java index 7490a3e..238dc4a 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java @@ -14,18 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.security.token.delegation; -import org.apache.kafka.common.security.scram.ScramCredentialCallback; +package org.apache.kafka.clients.admin; -public class DelegationTokenCredentialCallback extends ScramCredentialCallback { - private String tokenOwner; +import org.apache.kafka.common.annotation.InterfaceStability; - public void tokenOwner(String tokenOwner) { - this.tokenOwner = tokenOwner; +/** + * Options for {@link AdminClient#renewDelegationToken(byte[], RenewDelegationTokenOptions)}. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class RenewDelegationTokenOptions extends AbstractOptions<RenewDelegationTokenOptions> { + private long renewTimePeriodMs = -1; + + public RenewDelegationTokenOptions renewTimePeriodMs(long renewTimePeriodMs) { + this.renewTimePeriodMs = renewTimePeriodMs; + return this; } - public String tokenOwner() { - return tokenOwner; + public long renewTimePeriodMs() { + return renewTimePeriodMs; } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java similarity index 52% copy from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java copy to clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java index 7490a3e..38cdf1a 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java @@ -14,18 +14,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.security.token.delegation; -import org.apache.kafka.common.security.scram.ScramCredentialCallback; +package org.apache.kafka.clients.admin; -public class DelegationTokenCredentialCallback extends ScramCredentialCallback { - private String tokenOwner; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; - public void tokenOwner(String tokenOwner) { - this.tokenOwner = tokenOwner; +/** + * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class RenewDelegationTokenResult { + private final KafkaFuture<Long> expiryTimestamp; + + RenewDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) { + this.expiryTimestamp = expiryTimestamp; } - public String tokenOwner() { - return tokenOwner; + /** + * Returns a future which yields expiry timestamp + */ + public KafkaFuture<Long> expiryTimestamp() { + return expiryTimestamp; } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 80ccb7e..078d844 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -26,7 +26,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; -import org.apache.kafka.common.security.token.delegation.DelegationTokenCache; +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache; import org.apache.kafka.common.utils.Utils; import java.util.Collections; diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 5502164..3985c7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -41,7 +41,7 @@ import org.apache.kafka.common.security.scram.ScramCredential; import org.apache.kafka.common.security.scram.internal.ScramMechanism; import org.apache.kafka.common.security.scram.internal.ScramServerCallbackHandler; import org.apache.kafka.common.security.ssl.SslFactory; -import org.apache.kafka.common.security.token.delegation.DelegationTokenCache; +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache; import org.apache.kafka.common.utils.Java; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java index dba29ea..7ba270a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java @@ -184,4 +184,8 @@ public class DescribeDelegationTokenResponse extends AbstractResponse { public List<DelegationToken> tokens() { return tokens; } + + public boolean hasError() { + return this.error != Errors.NONE; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java index 0d43440..40f0aad 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java @@ -88,9 +88,9 @@ public class ExpireDelegationTokenRequest extends AbstractRequest { private final ByteBuffer hmac; private final long expiryTimePeriod; - public Builder(ByteBuffer hmac, long expiryTimePeriod) { + public Builder(byte[] hmac, long expiryTimePeriod) { super(ApiKeys.EXPIRE_DELEGATION_TOKEN); - this.hmac = hmac; + this.hmac = ByteBuffer.wrap(hmac); this.expiryTimePeriod = expiryTimePeriod; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java index f7e0ec4..1a673bc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java @@ -93,4 +93,8 @@ public class ExpireDelegationTokenResponse extends AbstractResponse { public int throttleTimeMs() { return throttleTimeMs; } + + public boolean hasError() { + return this.error != Errors.NONE; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java index 4a4b762..a65c705 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java @@ -88,9 +88,9 @@ public class RenewDelegationTokenRequest extends AbstractRequest { private final ByteBuffer hmac; private final long renewTimePeriod; - public Builder(ByteBuffer hmac, long renewTimePeriod) { + public Builder(byte[] hmac, long renewTimePeriod) { super(ApiKeys.RENEW_DELEGATION_TOKEN); - this.hmac = hmac; + this.hmac = ByteBuffer.wrap(hmac); this.renewTimePeriod = renewTimePeriod; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java index 1885b48..3233f5c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java @@ -93,4 +93,8 @@ public class RenewDelegationTokenResponse extends AbstractResponse { public long expiryTimestamp() { return expiryTimestamp; } + + public boolean hasError() { + return this.error != Errors.NONE; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java index deee0b8..6085727 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java @@ -40,7 +40,7 @@ import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFinal import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFirstMessage; import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFinalMessage; import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFirstMessage; -import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback; +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java index 377aa3d..9a3f0dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java @@ -29,8 +29,8 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.scram.ScramCredential; import org.apache.kafka.common.security.scram.ScramCredentialCallback; -import org.apache.kafka.common.security.token.delegation.DelegationTokenCache; -import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback; +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache; +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback; public class ScramServerCallbackHandler implements AuthenticateCallbackHandler { diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java index 05ccbda..e1f97c1 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java +++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java @@ -16,11 +16,16 @@ */ package org.apache.kafka.common.security.token.delegation; +import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.utils.Base64; -import java.nio.ByteBuffer; import java.util.Arrays; +/** + * A class representing a delegation token. + * + */ +@InterfaceStability.Evolving public class DelegationToken { private TokenInformation tokenInformation; private byte[] hmac; @@ -42,10 +47,6 @@ public class DelegationToken { return Base64.encoder().encodeToString(hmac); } - public ByteBuffer hmacBuffer() { - return ByteBuffer.wrap(hmac); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java index 1d500d2..ffd2af3 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java +++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java @@ -16,11 +16,17 @@ */ package org.apache.kafka.common.security.token.delegation; +import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.security.auth.KafkaPrincipal; import java.util.ArrayList; import java.util.Collection; +/** + * A class representing a delegation token details. + * + */ +@InterfaceStability.Evolving public class TokenInformation { private KafkaPrincipal owner; diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java similarity index 95% rename from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java rename to clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java index adea210..c05b735 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java +++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java @@ -15,12 +15,14 @@ * limitations under the License. */ -package org.apache.kafka.common.security.token.delegation; +package org.apache.kafka.common.security.token.delegation.internal; import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.scram.ScramCredential; import org.apache.kafka.common.security.scram.internal.ScramCredentialUtils; import org.apache.kafka.common.security.scram.internal.ScramMechanism; +import org.apache.kafka.common.security.token.delegation.DelegationToken; +import org.apache.kafka.common.security.token.delegation.TokenInformation; import java.util.Collection; import java.util.HashMap; diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java similarity index 94% rename from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java rename to clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java index 7490a3e..294d7b1 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java +++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.security.token.delegation; +package org.apache.kafka.common.security.token.delegation.internal; import org.apache.kafka.common.security.scram.ScramCredentialCallback; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index c141a8a..0f5df38 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -277,6 +277,26 @@ public class MockAdminClient extends AdminClient { } @Override + public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java index fab8e93..68979a1 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java +++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java @@ -44,7 +44,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.kafka.common.security.token.delegation.DelegationTokenCache; +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache; /** * Non-blocking EchoServer implementation that uses ChannelBuilder to create channels diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index bdbd106..c63cecd 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1223,7 +1223,7 @@ public class RequestResponseTest { } private RenewDelegationTokenRequest createRenewTokenRequest() { - return new RenewDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build(); + return new RenewDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build(); } private RenewDelegationTokenResponse createRenewTokenResponse() { @@ -1231,7 +1231,7 @@ public class RequestResponseTest { } private ExpireDelegationTokenRequest createExpireTokenRequest() { - return new ExpireDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build(); + return new ExpireDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build(); } private ExpireDelegationTokenResponse createExpireTokenResponse() { diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java index 3c4b82d..f6e43f9 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java @@ -23,7 +23,7 @@ import java.util.HashMap; import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.scram.ScramCredential; -import org.apache.kafka.common.security.token.delegation.DelegationTokenCache; +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache; import org.junit.Before; import org.junit.Test; diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index c010ba0..bcc11fd 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -35,8 +35,6 @@ import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.kafka.common.utils.LogContext -import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{KafkaThread, Time, Utils} import org.apache.kafka.common.{Cluster, Node, TopicPartition} @@ -342,33 +340,6 @@ class AdminClient(val time: Time, ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator) } - def createToken(renewers: List[KafkaPrincipal], maxTimePeriodMs: Long = -1): (Errors, DelegationToken) = { - val responseBody = sendAnyNode(ApiKeys.CREATE_DELEGATION_TOKEN, new CreateDelegationTokenRequest.Builder(renewers.asJava, maxTimePeriodMs)) - val response = responseBody.asInstanceOf[CreateDelegationTokenResponse] - val tokenInfo = new TokenInformation(response.tokenId, response.owner, renewers.asJava, - response.issueTimestamp, response.maxTimestamp, response.expiryTimestamp) - (response.error, new DelegationToken(tokenInfo, response.hmacBytes)) - } - - def renewToken(hmac: ByteBuffer, renewTimePeriod: Long = -1): (Errors, Long) = { - val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new RenewDelegationTokenRequest.Builder(hmac, renewTimePeriod)) - val response = responseBody.asInstanceOf[RenewDelegationTokenResponse] - (response.error, response.expiryTimestamp) - } - - def expireToken(hmac: ByteBuffer, expiryTimeStamp: Long = -1): (Errors, Long) = { - val responseBody = sendAnyNode(ApiKeys.EXPIRE_DELEGATION_TOKEN, new ExpireDelegationTokenRequest.Builder(hmac, expiryTimeStamp)) - val response = responseBody.asInstanceOf[ExpireDelegationTokenResponse] - (response.error, response.expiryTimestamp) - } - - def describeToken(owners: List[KafkaPrincipal]): (Errors, List[DelegationToken]) = { - val ownersList = if (owners == null) null else owners.asJava - val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new DescribeDelegationTokenRequest.Builder(ownersList)) - val response = responseBody.asInstanceOf[DescribeDelegationTokenResponse] - (response.error, response.tokens().asScala.toList) - } - def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = { def coordinatorLookup(group: String): Either[Node, Errors] = { diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala index 6c5d1ce..0e6ea86 100644 --- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala +++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala @@ -17,12 +17,13 @@ package kafka.admin -import java.nio.ByteBuffer +import java.text.SimpleDateFormat +import java.util -import joptsimple._ +import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser} import kafka.utils.{CommandLineUtils, Exit, Logging} import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.clients.admin.{CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions, AdminClient => JAdminClient} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.token.delegation.DelegationToken import org.apache.kafka.common.utils.{Base64, SecurityUtils, Utils} @@ -71,19 +72,20 @@ object DelegationTokenCommand extends Logging { } } - def createToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = { - val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt) + def createToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): DelegationToken = { + val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt).getOrElse(new util.LinkedList[KafkaPrincipal]()) val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue println("Calling create token operation with renewers :" + renewerPrincipals +" , max-life-time-period :"+ maxLifeTimeMs) - val response = adminClient.createToken(renewerPrincipals, maxLifeTimeMs) - response match { - case (Errors.NONE, token) => println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token)) - case (e, _) => throw new AdminOperationException(e.message) - } + val createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals) + val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) + val token = createResult.delegationToken().get() + println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token)) + token } def printToken(tokens: List[DelegationToken]): Unit = { + val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm") print("\n%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format("TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) for (token <- tokens) { val tokenInfo = token.tokenInfo @@ -92,56 +94,59 @@ object DelegationTokenCommand extends Logging { token.hmacAsBase64String, tokenInfo.owner, tokenInfo.renewersAsString, - tokenInfo.issueTimestamp, - tokenInfo.expiryTimestamp, - tokenInfo.maxTimestamp)) + dateFormat.format(tokenInfo.issueTimestamp), + dateFormat.format(tokenInfo.expiryTimestamp), + dateFormat.format(tokenInfo.maxTimestamp))) println() } } - private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): List[KafkaPrincipal] = { + private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Option[util.List[KafkaPrincipal]] = { if (opts.options.has(principalOptionSpec)) - opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList + Some(opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList.asJava) else - List.empty[KafkaPrincipal] + None } - def renewToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = { + def renewToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = { val hmac = opts.options.valueOf(opts.hmacOpt) val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue() println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs) - val response = adminClient.renewToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), renewTimePeriodMs) - response match { - case (Errors.NONE, expiryTimeStamp) => println("Completed renew operation. New expiry timestamp : %s".format(expiryTimeStamp)) - case (e, expiryTimeStamp) => throw new AdminOperationException(e.message) - } + val renewResult = adminClient.renewDelegationToken(Base64.decoder.decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs)) + val expiryTimeStamp = renewResult.expiryTimestamp().get() + val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm") + println("Completed renew operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp))) + expiryTimeStamp } - def expireToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = { + def expireToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = { val hmac = opts.options.valueOf(opts.hmacOpt) val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue() println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs) - val response = adminClient.expireToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), expiryTimePeriodMs) - response match { - case (Errors.NONE, expiryTimeStamp) => println("Completed expire operation. New expiry timestamp : %s".format(expiryTimeStamp)) - case (e, expiryTimeStamp) => throw new AdminOperationException(e.message) - } + val expireResult = adminClient.expireDelegationToken(Base64.decoder.decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs)) + val expiryTimeStamp = expireResult.expiryTimestamp().get() + val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm") + println("Completed expire operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp))) + expiryTimeStamp } - def describeToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = { + def describeToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): List[DelegationToken] = { val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt) - println("Calling describe token operation for owners :" + ownerPrincipals) - val response = adminClient.describeToken(ownerPrincipals) - response match { - case (Errors.NONE, tokens) => println("Total Number of tokens : %s".format(tokens.size)); printToken(tokens) - case (e, tokens) => throw new AdminOperationException(e.message) - } + if (ownerPrincipals.isEmpty) + println("Calling describe token operation for current user.") + else + println("Calling describe token operation for owners :" + ownerPrincipals.get) + + val describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals.orNull)) + val tokens = describeResult.delegationTokens().get().asScala.toList + println("Total number of tokens : %s".format(tokens.size)); printToken(tokens) + tokens } - private def createAdminClient(opts: DelegationTokenCommandOptions): AdminClient = { + private def createAdminClient(opts: DelegationTokenCommandOptions): JAdminClient = { val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) - AdminClient.create(props) + JAdminClient.create(props) } class DelegationTokenCommandOptions(args: Array[String]) { @@ -157,10 +162,11 @@ object DelegationTokenCommand extends Logging { .withRequiredArg .ofType(classOf[String]) - val createOpt = parser.accepts("create", "Create a new delegation token.") - val renewOpt = parser.accepts("renew", "Renew delegation token.") - val expiryOpt = parser.accepts("expire", "Expire delegation token.") - val describeOpt = parser.accepts("describe", "describe delegation tokens.") + val createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.") + val renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.") + val expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.") + val describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." + + " If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.") val ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.") .withOptionalArg() diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala index 6f9c252..f208087 100644 --- a/core/src/main/scala/kafka/security/CredentialProvider.scala +++ b/core/src/main/scala/kafka/security/CredentialProvider.scala @@ -24,7 +24,7 @@ import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.common.config.ConfigDef._ import org.apache.kafka.common.security.scram.internal.{ScramCredentialUtils, ScramMechanism} -import org.apache.kafka.common.security.token.delegation.DelegationTokenCache +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: DelegationTokenCache) { diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala index 4a947a1..62a5e20 100644 --- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala +++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala @@ -31,7 +31,8 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.scram.internal.{ScramFormatter, ScramMechanism} import org.apache.kafka.common.security.scram.ScramCredential -import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation} +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache +import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{Base64, Sanitizer, SecurityUtils, Time} import scala.collection.JavaConverters._ diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7105688..a0d2c79 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -45,7 +45,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse} import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internal.ScramMechanism -import org.apache.kafka.common.security.token.delegation.DelegationTokenCache +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time} import org.apache.kafka.common.{ClusterResource, Node} diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala index 27a6d11..56a3b8a 100644 --- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala @@ -18,10 +18,9 @@ package kafka.api import java.util -import kafka.admin.AdminClient import kafka.server.KafkaConfig import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils} -import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internal.ScramMechanism @@ -83,9 +82,8 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism) config.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) - val adminClient = AdminClient.create(config.asScala.toMap) - val (error, token) = adminClient.createToken(List()) - + val adminClient = AdminClient.create(config) + val token = adminClient.createDelegationToken().delegationToken().get() //wait for token to reach all the brokers TestUtils.waitUntilTrue(() => servers.forall(server => !server.tokenCache.tokens().isEmpty), "Timed out waiting for token to propagate to all servers") diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala new file mode 100644 index 0000000..6ae8f5e --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import java.util + +import kafka.admin.DelegationTokenCommand.DelegationTokenCommandOptions +import kafka.api.{KafkaSasl, SaslSetup} +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.{JaasTestUtils, TestUtils} +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.junit.Assert._ +import org.junit.{After, Before, Test} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.concurrent.ExecutionException + +class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup { + override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT + private val kafkaClientSaslMechanism = "PLAIN" + private val kafkaServerSaslMechanisms = List("PLAIN") + protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) + var adminClient: org.apache.kafka.clients.admin.AdminClient = null + + override def numBrokers = 1 + + @Before + override def setUp(): Unit = { + startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName)) + super.setUp() + } + + override def generateConfigs = { + val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, + enableControlledShutdown = false, enableDeleteTopic = true, + interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true) + props.foreach(propertyOverrides) + props.map(KafkaConfig.fromProps) + } + + private def createAdminConfig():util.Map[String, Object] = { + val config = new util.HashMap[String, Object] + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val securityProps: util.Map[Object, Object] = + TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) + securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) } + config + } + + @Test + def testDelegationTokenRequests(): Unit = { + adminClient = org.apache.kafka.clients.admin.AdminClient.create(createAdminConfig) + val renewer1 = "User:renewer1" + val renewer2 = "User:renewer2" + + // create token1 with renewer1 + val tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer1))) + + var tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List())) + assertTrue(tokens.size == 1) + val token1 = tokens.head + assertEquals(token1, tokenCreated) + + // create token2 with renewer2 + val token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer2))) + + tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List())) + assertTrue(tokens.size == 2) + assertEquals(Set(token1, token2), tokens.toSet) + + //get tokens for renewer2 + tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer2))) + assertTrue(tokens.size == 1) + assertEquals(Set(token2), tokens.toSet) + + //test renewing tokens + val expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String())) + val renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer1))).head + assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp()) + + //test expire tokens + DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String())) + DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String())) + + tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List())) + assertTrue(tokens.size == 0) + + //create token with invalid renewer principal type + intercept[ExecutionException](DelegationTokenCommand.createToken(adminClient, getCreateOpts(List("Group:Renewer3")))) + + // try describing tokens for unknown owner + assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List("User:Unknown"))).isEmpty) + } + + private def getCreateOpts(renewers: List[String]): DelegationTokenCommandOptions = { + val opts = ListBuffer("--bootstrap-server", brokerList, "--max-life-time-period", "-1", + "--command-config", "testfile", "--create") + renewers.foreach(renewer => opts ++= ListBuffer("--renewer-principal", renewer)) + new DelegationTokenCommandOptions(opts.toArray) + } + + private def getDescribeOpts(owners: List[String]): DelegationTokenCommandOptions = { + val opts = ListBuffer("--bootstrap-server", brokerList, "--command-config", "testfile", "--describe") + owners.foreach(owner => opts ++= ListBuffer("--owner-principal", owner)) + new DelegationTokenCommandOptions(opts.toArray) + } + + private def getRenewOpts(hmac: String): DelegationTokenCommandOptions = { + val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--renew", + "--renew-time-period", "-1", + "--hmac", hmac) + new DelegationTokenCommandOptions(opts) + } + + private def getExpireOpts(hmac: String): DelegationTokenCommandOptions = { + val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--expire", + "--expiry-time-period", "-1", + "--hmac", hmac) + new DelegationTokenCommandOptions(opts) + } + + @After + override def tearDown(): Unit = { + if (adminClient != null) + adminClient.close() + super.tearDown() + closeSasl() + } +} diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala index b8388b4..6093622 100644 --- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala +++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala @@ -30,7 +30,8 @@ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.scram.internal.ScramMechanism -import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation} +import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache +import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{MockTime, SecurityUtils} import org.junit.Assert._ import org.junit.{After, Before, Test} diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala index 4c42dd2..3d4be53 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala @@ -19,14 +19,13 @@ package kafka.server import java.nio.ByteBuffer import java.util -import kafka.admin.AdminClient import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.AdminClientConfig -import org.apache.kafka.common.protocol.Errors -import org.junit.Assert._ +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} +import org.apache.kafka.common.errors.UnsupportedByAuthenticationException import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionException class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest { var adminClient: AdminClient = null @@ -49,21 +48,19 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest { @Test def testDelegationTokenRequests(): Unit = { - adminClient = AdminClient.create(createAdminConfig.asScala.toMap) + adminClient = AdminClient.create(createAdminConfig) - val createResponse = adminClient.createToken(List()) - assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, createResponse._1) + val createResult = adminClient.createDelegationToken() + intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException] - val describeResponse = adminClient.describeToken(List()) - assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, describeResponse._1) + val describeResult = adminClient.describeDelegationToken() + intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException] - //test renewing tokens - val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes())) - assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, renewResponse._1) + val renewResult = adminClient.renewDelegationToken("".getBytes()) + intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException] - //test expire tokens tokens - val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes())) - assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, expireResponse._1) + val expireResult = adminClient.expireDelegationToken("".getBytes()) + intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException] } diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala index 55bf5fd..a002750 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala @@ -18,17 +18,17 @@ package kafka.server import java.util -import kafka.admin.AdminClient import kafka.api.{KafkaSasl, SaslSetup} import kafka.utils.{JaasTestUtils, TestUtils} -import org.apache.kafka.clients.admin.AdminClientConfig -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions} +import org.apache.kafka.common.errors.InvalidPrincipalTypeException import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.SecurityUtils import org.junit.Assert._ import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionException class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup { override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT @@ -46,15 +46,6 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup { super.setUp() } - def createAdminConfig():util.Map[String, Object] = { - val config = new util.HashMap[String, Object] - config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - val securityProps: util.Map[Object, Object] = - TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) - securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) } - config - } - override def generateConfigs = { val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false, enableDeleteTopic = true, @@ -64,46 +55,73 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup { props.map(KafkaConfig.fromProps) } + private def createAdminConfig():util.Map[String, Object] = { + val config = new util.HashMap[String, Object] + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val securityProps: util.Map[Object, Object] = + TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) + securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) } + config + } + @Test def testDelegationTokenRequests(): Unit = { - adminClient = AdminClient.create(createAdminConfig.asScala.toMap) - - // test creating token - val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser)) - val tokenResult1 = adminClient.createToken(renewer1) - assertEquals(Errors.NONE, tokenResult1._1) - var token1 = adminClient.describeToken(null)._2.head - assertEquals(token1, tokenResult1._2) + adminClient = AdminClient.create(createAdminConfig) + + // create token1 with renewer1 + val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:renewer1")).asJava + val createResult1 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer1)) + val tokenCreated = createResult1.delegationToken().get() + + //test describe token + var tokens = adminClient.describeDelegationToken().delegationTokens().get() + assertTrue(tokens.size() == 1) + var token1 = tokens.get(0) + assertEquals(token1, tokenCreated) + + // create token2 with renewer2 + val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:renewer2")).asJava + val createResult2 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer2)) + val token2 = createResult2.delegationToken().get() + + //get all tokens + tokens = adminClient.describeDelegationToken().delegationTokens().get() + assertTrue(tokens.size() == 2) + assertEquals(Set(token1, token2), tokens.asScala.toSet) + + //get tokens for renewer2 + tokens = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(renewer2)).delegationTokens().get() + assertTrue(tokens.size() == 1) + assertEquals(Set(token2), tokens.asScala.toSet) //test renewing tokens - val renewResponse = adminClient.renewToken(token1.hmacBuffer()) - assertEquals(Errors.NONE, renewResponse._1) - - token1 = adminClient.describeToken(null)._2.head - assertEquals(renewResponse._2, token1.tokenInfo().expiryTimestamp()) + val renewResult = adminClient.renewDelegationToken(token1.hmac()) + var expiryTimestamp = renewResult.expiryTimestamp().get() - //test describe tokens - val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:Renewer1")) - val tokenResult2 = adminClient.createToken(renewer2) - assertEquals(Errors.NONE, tokenResult2._1) - val token2 = tokenResult2._2 + val describeResult = adminClient.describeDelegationToken() + val tokenId = token1.tokenInfo().tokenId() + token1 = describeResult.delegationTokens().get().asScala.filter(dt => dt.tokenInfo().tokenId() == tokenId).head + assertEquals(expiryTimestamp, token1.tokenInfo().expiryTimestamp()) - assertTrue(adminClient.describeToken(null)._2.size == 2) + //test expire tokens + val expireResult1 = adminClient.expireDelegationToken(token1.hmac()) + expiryTimestamp = expireResult1.expiryTimestamp().get() - //test expire tokens tokens - val expireResponse1 = adminClient.expireToken(token1.hmacBuffer()) - assertEquals(Errors.NONE, expireResponse1._1) + val expireResult2 = adminClient.expireDelegationToken(token2.hmac()) + expiryTimestamp = expireResult2.expiryTimestamp().get() - val expireResponse2 = adminClient.expireToken(token2.hmacBuffer()) - assertEquals(Errors.NONE, expireResponse2._1) - - assertTrue(adminClient.describeToken(null)._2.size == 0) + tokens = adminClient.describeDelegationToken().delegationTokens().get() + assertTrue(tokens.size == 0) //create token with invalid principal type - val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer1")) - val tokenResult3 = adminClient.createToken(renewer3) - assertEquals(Errors.INVALID_PRINCIPAL_TYPE, tokenResult3._1) - + val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava + val createResult3 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer3)) + intercept[ExecutionException](createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException] + + // try describing tokens for unknown owner + val unknownOwner = List(SecurityUtils.parseKafkaPrincipal("User:Unknown")).asJava + tokens = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(unknownOwner)).delegationTokens().get() + assertTrue(tokens.isEmpty) } @After diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala index 0561cac..8f8842b 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala @@ -19,17 +19,15 @@ package kafka.server import java.nio.ByteBuffer import java.util -import kafka.admin.AdminClient import kafka.api.{KafkaSasl, SaslSetup} import kafka.utils.{JaasTestUtils, TestUtils} -import org.apache.kafka.clients.admin.AdminClientConfig -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.utils.SecurityUtils -import org.junit.Assert._ -import org.junit.{After, Before, Test} +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} +import org.apache.kafka.common.errors.DelegationTokenDisabledException import org.apache.kafka.common.security.auth.SecurityProtocol +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionException class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest with SaslSetup { override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT @@ -58,23 +56,19 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest @Test def testDelegationTokenRequests(): Unit = { - adminClient = AdminClient.create(createAdminConfig.asScala.toMap) - - val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser)) - val createResponse = adminClient.createToken(renewer1) - assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createResponse._1) + adminClient = AdminClient.create(createAdminConfig) - val describeResponse = adminClient.describeToken(List()) - assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, describeResponse._1) + val createResult = adminClient.createDelegationToken() + intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[DelegationTokenDisabledException] - //test renewing tokens - val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes())) - assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, renewResponse._1) + val describeResult = adminClient.describeDelegationToken() + intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[DelegationTokenDisabledException] - //test expire tokens tokens - val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes())) - assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, expireResponse._1) + val renewResult = adminClient.renewDelegationToken("".getBytes()) + intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException] + val expireResult = adminClient.expireDelegationToken("".getBytes()) + intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException] } @After diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 2a7d6d4..ed85415 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -315,13 +315,13 @@ class RequestQuotaTest extends BaseRequestTest { new CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")), 1000) case ApiKeys.EXPIRE_DELEGATION_TOKEN => - new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000) + new ExpireDelegationTokenRequest.Builder("".getBytes, 1000) case ApiKeys.DESCRIBE_DELEGATION_TOKEN => new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test"))) case ApiKeys.RENEW_DELEGATION_TOKEN => - new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000) + new RenewDelegationTokenRequest.Builder("".getBytes, 1000) case ApiKeys.DELETE_GROUPS => new DeleteGroupsRequest.Builder(Collections.singleton("test-group")) -- To stop receiving notification emails like this one, please contact jun...@apache.org.