This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 81b3b2fb339 KAFKA-13771: Support to explicitly delete delegationTokens
that have expired but have not been automatically cleaned up (#11976)
81b3b2fb339 is described below
commit 81b3b2fb3399ab2784eb8158564fc1c9a1299a8d
Author: RivenSun <[email protected]>
AuthorDate: Tue Feb 21 12:37:42 2023 +0800
KAFKA-13771: Support to explicitly delete delegationTokens that have
expired but have not been automatically cleaned up (#11976)
Reviewers: Manikumar Reddy <[email protected]>
---
core/src/main/scala/kafka/server/DelegationTokenManager.scala | 4 ++--
.../security/token/delegation/DelegationTokenManagerTest.scala | 8 ++++++--
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index 9465197b4ac..a715718c420 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -416,12 +416,12 @@ class DelegationTokenManager(val config: KafkaConfig,
if (!allowedToRenew(principal, tokenInfo)) {
expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH,
-1)
- } else if (tokenInfo.maxTimestamp < now ||
tokenInfo.expiryTimestamp < now) {
- expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
} else if (expireLifeTimeMs < 0) { //expire immediately
removeToken(tokenInfo.tokenId)
info(s"Token expired for token: ${tokenInfo.tokenId} for owner:
${tokenInfo.owner}")
expireResponseCallback(Errors.NONE, now)
+ } else if (tokenInfo.maxTimestamp < now ||
tokenInfo.expiryTimestamp < now) {
+ expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
} else {
//set expiry time stamp
val expiryTimeStamp = Math.min(tokenInfo.maxTimestamp, now +
expireLifeTimeMs)
diff --git
a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index fa828b166f2..215de8b7962 100644
---
a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -186,8 +186,12 @@ class DelegationTokenManagerTest extends QuorumTestHarness
{
tokenManager.expireToken(owner, ByteBuffer.wrap(password), 2 * 60 * 60 *
1000L, renewResponseCallback)
assertEquals(expectedExpiryStamp, expiryTimeStamp)
- //try expire token immediately
- time.sleep(1 * 60 * 60 * 1000L)
+ //try renewing an expired token
+ time.sleep(8 * 24 * 60 * 60 * 1000L)
+ tokenManager.renewToken(owner, ByteBuffer.wrap(password), -1,
renewResponseCallback)
+ assertEquals(Errors.DELEGATION_TOKEN_EXPIRED, error)
+
+ //try expire token immediately, even if it is an expired token
tokenManager.expireToken(owner, ByteBuffer.wrap(password), -1,
renewResponseCallback)
assert(tokenManager.getToken(tokenId).isEmpty)
assertEquals(Errors.NONE, error)