This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 924c1081dc7 KAFKA-17415 Avoid overflow of expired timestamp (#17026)
924c1081dc7 is described below
commit 924c1081dc78f27947d7ed7f6bbb9d438c5fb5c1
Author: TengYao Chi <[email protected]>
AuthorDate: Mon Oct 7 01:43:43 2024 +0800
KAFKA-17415 Avoid overflow of expired timestamp (#17026)
Both ZK and KRaft modes do not handle overflow, so setting a large max
lifetime results in a negative expired timestamp and negative max timestamp,
which is unexpected behavior.
In this PR, we are only fixing the KRaft code since ZK will be removed soon.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/api/SaslSslAdminIntegrationTest.scala | 32 ++++++++++++++++------
.../controller/DelegationTokenControlManager.java | 13 +++++----
2 files changed, 32 insertions(+), 13 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 35c11382ffc..5b6e3fc2c24 100644
---
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -53,7 +53,6 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME)
var superUserAdmin: Admin = _
val secretKey = "secretKey"
- val maxLifeTime = 5000
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile =
Some(TestUtils.tempFile("truststore", ".jks"))
@@ -67,7 +66,8 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
// Enable delegationTokenControlManager
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
secretKey)
-
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
maxLifeTime.toString)
+
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG,
Long.MaxValue.toString)
+
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
Long.MaxValue.toString)
setUpSasl()
super.setUp(testInfo)
@@ -408,14 +408,14 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
- def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = {
+ def testCreateDelegationTokenWithSmallerTimeout(quorum: String): Unit = {
client = createAdminClient
- val timeout = Long.MaxValue
+ val timeout = 5000
val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
val tokenInfo =
client.createDelegationToken(options).delegationToken.get.tokenInfo
- assertEquals(maxLifeTime, tokenInfo.maxTimestamp -
tokenInfo.issueTimestamp)
+ assertEquals(timeout, tokenInfo.maxTimestamp - tokenInfo.issueTimestamp)
assertTrue(tokenInfo.maxTimestamp >= tokenInfo.expiryTimestamp)
}
@@ -423,13 +423,13 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
@ValueSource(strings = Array("kraft"))
def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = {
client = createAdminClient
- val timeout = -1
+ val timeout = 5000
val createOptions = new
CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
val token = client.createDelegationToken(createOptions).delegationToken.get
val tokenInfo = token.tokenInfo
- assertEquals(maxLifeTime, tokenInfo.maxTimestamp -
tokenInfo.issueTimestamp)
+ assertEquals(timeout, tokenInfo.maxTimestamp - tokenInfo.issueTimestamp)
assertTrue(tokenInfo.maxTimestamp >= tokenInfo.expiryTimestamp)
TestUtils.waitUntilTrue(() => brokers.forall(server =>
server.tokenCache.tokens.size == 1),
@@ -633,7 +633,7 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
@ValueSource(strings = Array("kraft"))
def testExpireDelegationToken(quorum: String): Unit = {
client = createAdminClient
- val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+ val createDelegationTokenOptions = new
CreateDelegationTokenOptions().maxlifeTimeMs(5000)
// Test expiration for non-exists token
TestUtils.assertFutureExceptionTypeEquals(
@@ -662,6 +662,22 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
TestUtils.retry(1000) { assertTrue(expireTokenOrFailWithAssert(token3,
200) < token3.tokenInfo().expiryTimestamp()) }
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testCreateTokenWithOverflowTimestamp(quorum: String): Unit = {
+ client = createAdminClient
+ val token = client.createDelegationToken(new
CreateDelegationTokenOptions().maxlifeTimeMs(Long.MaxValue)).delegationToken().get()
+ assertEquals(Long.MaxValue, token.tokenInfo().expiryTimestamp())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testExpireTokenWithOverflowTimestamp(quorum: String): Unit = {
+ client = createAdminClient
+ val token = client.createDelegationToken(new
CreateDelegationTokenOptions().maxlifeTimeMs(Long.MaxValue)).delegationToken().get()
+ TestUtils.retry(1000) { assertTrue(expireTokenOrFailWithAssert(token,
Long.MaxValue) == Long.MaxValue) }
+ }
+
private def expireTokenOrFailWithAssert(token: DelegationToken,
expiryTimePeriodMs: Long): Long = {
try {
client.expireDelegationToken(token.hmac(), new
ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
index 04a69a1e36a..8e002da5030 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
@@ -188,8 +188,8 @@ public class DelegationTokenControlManager {
maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
}
- long maxTimestamp = now + maxLifeTime;
- long expiryTimestamp = Math.min(maxTimestamp, now +
tokenDefaultRenewLifetimeMs);
+ long maxTimestamp = sum(now, maxLifeTime);
+ long expiryTimestamp = Math.min(maxTimestamp, sum(now,
tokenDefaultRenewLifetimeMs));
String tokenId = Uuid.randomUuid().toString();
@@ -314,9 +314,8 @@ public class DelegationTokenControlManager {
setTokenId(myTokenInformation.tokenId()), (short) 0));
} else if (myTokenInformation.maxTimestamp() < now ||
myTokenInformation.expiryTimestamp() < now) {
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code());
- } else {
- long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(),
- now + requestData.expiryTimePeriodMs());
+ } else {
+ long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(),
sum(now, requestData.expiryTimePeriodMs()));
responseData
.setErrorCode(NONE.code())
@@ -354,4 +353,8 @@ public class DelegationTokenControlManager {
public void replay(RemoveDelegationTokenRecord record) {
log.info("Replayed RemoveDelegationTokenRecord for {}.",
record.tokenId());
}
+
+ private long sum(long now, long duration) {
+ return now > Long.MAX_VALUE - duration ? Long.MAX_VALUE : now +
duration;
+ }
}