This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 924c1081dc7 KAFKA-17415 Avoid overflow of expired timestamp (#17026)
924c1081dc7 is described below

commit 924c1081dc78f27947d7ed7f6bbb9d438c5fb5c1
Author: TengYao Chi <[email protected]>
AuthorDate: Mon Oct 7 01:43:43 2024 +0800

    KAFKA-17415 Avoid overflow of expired timestamp (#17026)
    
    Both ZK and KRaft modes do not handle overflow, so setting a large max 
lifetime results in a negative expired timestamp and negative max timestamp, 
which is unexpected behavior.
    
    In this PR, we are only fixing the KRaft code since ZK will be removed soon.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/api/SaslSslAdminIntegrationTest.scala    | 32 ++++++++++++++++------
 .../controller/DelegationTokenControlManager.java  | 13 +++++----
 2 files changed, 32 insertions(+), 13 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 35c11382ffc..5b6e3fc2c24 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -53,7 +53,6 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME)
   var superUserAdmin: Admin = _
   val secretKey = "secretKey"
-  val maxLifeTime = 5000
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
 
@@ -67,7 +66,8 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
 
     // Enable delegationTokenControlManager
     
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
 secretKey)
-    
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
 maxLifeTime.toString)
+    
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG,
 Long.MaxValue.toString)
+    
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
 Long.MaxValue.toString)
 
     setUpSasl()
     super.setUp(testInfo)
@@ -408,14 +408,14 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
 
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
-  def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = {
+  def testCreateDelegationTokenWithSmallerTimeout(quorum: String): Unit = {
     client = createAdminClient
-    val timeout = Long.MaxValue
+    val timeout = 5000
 
     val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
     val tokenInfo = 
client.createDelegationToken(options).delegationToken.get.tokenInfo
 
-    assertEquals(maxLifeTime, tokenInfo.maxTimestamp - 
tokenInfo.issueTimestamp)
+    assertEquals(timeout, tokenInfo.maxTimestamp - tokenInfo.issueTimestamp)
     assertTrue(tokenInfo.maxTimestamp >= tokenInfo.expiryTimestamp)
   }
 
@@ -423,13 +423,13 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   @ValueSource(strings = Array("kraft"))
   def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = {
     client = createAdminClient
-    val timeout = -1
+    val timeout = 5000
 
     val createOptions = new 
CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
     val token = client.createDelegationToken(createOptions).delegationToken.get
     val tokenInfo = token.tokenInfo
 
-    assertEquals(maxLifeTime, tokenInfo.maxTimestamp - 
tokenInfo.issueTimestamp)
+    assertEquals(timeout, tokenInfo.maxTimestamp - tokenInfo.issueTimestamp)
     assertTrue(tokenInfo.maxTimestamp >= tokenInfo.expiryTimestamp)
 
     TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens.size == 1),
@@ -633,7 +633,7 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   @ValueSource(strings = Array("kraft"))
   def testExpireDelegationToken(quorum: String): Unit = {
     client = createAdminClient
-    val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+    val createDelegationTokenOptions = new 
CreateDelegationTokenOptions().maxlifeTimeMs(5000)
 
     // Test expiration for non-exists token
     TestUtils.assertFutureExceptionTypeEquals(
@@ -662,6 +662,22 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
     TestUtils.retry(1000) { assertTrue(expireTokenOrFailWithAssert(token3, 
200) < token3.tokenInfo().expiryTimestamp()) }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testCreateTokenWithOverflowTimestamp(quorum: String): Unit = {
+    client = createAdminClient
+    val token = client.createDelegationToken(new 
CreateDelegationTokenOptions().maxlifeTimeMs(Long.MaxValue)).delegationToken().get()
+    assertEquals(Long.MaxValue, token.tokenInfo().expiryTimestamp())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testExpireTokenWithOverflowTimestamp(quorum: String): Unit = {
+    client = createAdminClient
+    val token = client.createDelegationToken(new 
CreateDelegationTokenOptions().maxlifeTimeMs(Long.MaxValue)).delegationToken().get()
+    TestUtils.retry(1000) { assertTrue(expireTokenOrFailWithAssert(token, 
Long.MaxValue) == Long.MaxValue) }
+  }
+
   private def expireTokenOrFailWithAssert(token: DelegationToken, 
expiryTimePeriodMs: Long): Long  = {
     try {
       client.expireDelegationToken(token.hmac(), new 
ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
index 04a69a1e36a..8e002da5030 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
@@ -188,8 +188,8 @@ public class DelegationTokenControlManager {
             maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
         }
 
-        long maxTimestamp = now + maxLifeTime;
-        long expiryTimestamp = Math.min(maxTimestamp, now + 
tokenDefaultRenewLifetimeMs);
+        long maxTimestamp = sum(now, maxLifeTime);
+        long expiryTimestamp = Math.min(maxTimestamp, sum(now, 
tokenDefaultRenewLifetimeMs));
 
         String tokenId = Uuid.randomUuid().toString();
 
@@ -314,9 +314,8 @@ public class DelegationTokenControlManager {
                 setTokenId(myTokenInformation.tokenId()), (short) 0));
         } else if (myTokenInformation.maxTimestamp() < now || 
myTokenInformation.expiryTimestamp() < now) {
             responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code());
-        } else {
-            long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(),
-                now + requestData.expiryTimePeriodMs());
+        }  else {
+            long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), 
sum(now, requestData.expiryTimePeriodMs()));
 
             responseData
                 .setErrorCode(NONE.code())
@@ -354,4 +353,8 @@ public class DelegationTokenControlManager {
     public void replay(RemoveDelegationTokenRecord record) {
         log.info("Replayed RemoveDelegationTokenRecord for {}.", 
record.tokenId());
     }
+
+    private long sum(long now, long duration) {
+        return now > Long.MAX_VALUE - duration ? Long.MAX_VALUE : now + 
duration;
+    }
 }

Reply via email to