This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 359ddce3b2c KAFKA-17137 Ensure Admin APIs are properly tested 
(token-related) (#16905)
359ddce3b2c is described below

commit 359ddce3b2ce9288389751f5a64a965db9bf12be
Author: TaiJuWu <[email protected]>
AuthorDate: Tue Sep 3 00:22:55 2024 +0800

    KAFKA-17137 Ensure Admin APIs are properly tested (token-related) (#16905)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/api/SaslSslAdminIntegrationTest.scala    | 41 ++++++++++++++++++++--
 1 file changed, 38 insertions(+), 3 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index d7ba9430990..bda617eaeac 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -55,13 +55,13 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME)
   var superUserAdmin: Admin = _
   val secretKey = "secretKey"
+  val maxLifeTime = 5000
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     // set this to use delegation token
-    
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
 secretKey)
     if (TestInfoUtils.isKRaft(testInfo)) {
       
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
kraftAuthorizerClassName)
       
this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
kraftAuthorizerClassName)
@@ -75,8 +75,8 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
     }
 
     // Enable delegationTokenControlManager
-    
serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
 "123")
-    
serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
 "5000")
+    
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
 secretKey)
+    
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
 maxLifeTime.toString)
 
     setUpSasl()
     super.setUp(testInfo)
@@ -415,6 +415,41 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
     assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), 
classOf[InvalidRequestException])
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = {
+    client = createAdminClient
+    val timeout = Long.MaxValue
+
+    val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+    val tokenInfo = 
client.createDelegationToken(options).delegationToken.get.tokenInfo
+
+    assertEquals(maxLifeTime, tokenInfo.maxTimestamp - 
tokenInfo.issueTimestamp)
+    assertTrue(tokenInfo.maxTimestamp >= tokenInfo.expiryTimestamp)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = {
+    client = createAdminClient
+    val timeout = -1
+
+    val createOptions = new 
CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+    val token = client.createDelegationToken(createOptions).delegationToken.get
+    val tokenInfo = token.tokenInfo
+
+    assertEquals(maxLifeTime, tokenInfo.maxTimestamp - 
tokenInfo.issueTimestamp)
+    assertTrue(tokenInfo.maxTimestamp >= tokenInfo.expiryTimestamp)
+
+    TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens.size == 1),
+      "Timed out waiting for token to propagate to all servers")
+
+    val expiredOptions = new 
ExpireDelegationTokenOptions().expiryTimePeriodMs(tokenInfo.maxTimestamp + 1)
+    val expiredResult = client.expireDelegationToken(token.hmac, 
expiredOptions)
+
+    assertEquals(tokenInfo.maxTimestamp, expiredResult.expiryTimestamp.get())
+  }
+
   private def verifyCauseIsClusterAuth(e: Throwable): Unit = 
assertEquals(classOf[ClusterAuthorizationException], e.getCause.getClass)
 
   private def testAclCreateGetDelete(expectAuth: Boolean): Unit = {

Reply via email to