This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 359ddce3b2c KAFKA-17137 Ensure Admin APIs are properly tested
(token-related) (#16905)
359ddce3b2c is described below
commit 359ddce3b2ce9288389751f5a64a965db9bf12be
Author: TaiJuWu <[email protected]>
AuthorDate: Tue Sep 3 00:22:55 2024 +0800
KAFKA-17137 Ensure Admin APIs are properly tested (token-related) (#16905)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/api/SaslSslAdminIntegrationTest.scala | 41 ++++++++++++++++++++--
1 file changed, 38 insertions(+), 3 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index d7ba9430990..bda617eaeac 100644
---
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -55,13 +55,13 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME)
var superUserAdmin: Admin = _
val secretKey = "secretKey"
+ val maxLifeTime = 5000
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile =
Some(TestUtils.tempFile("truststore", ".jks"))
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
// set this to use delegation token
-
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
secretKey)
if (TestInfoUtils.isKRaft(testInfo)) {
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG,
kraftAuthorizerClassName)
this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG,
kraftAuthorizerClassName)
@@ -75,8 +75,8 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
}
// Enable delegationTokenControlManager
-
serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
"123")
-
serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
"5000")
+
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
secretKey)
+
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
maxLifeTime.toString)
setUpSasl()
super.setUp(testInfo)
@@ -415,6 +415,41 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl),
classOf[InvalidRequestException])
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = {
+ client = createAdminClient
+ val timeout = Long.MaxValue
+
+ val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+ val tokenInfo =
client.createDelegationToken(options).delegationToken.get.tokenInfo
+
+ assertEquals(maxLifeTime, tokenInfo.maxTimestamp -
tokenInfo.issueTimestamp)
+ assertTrue(tokenInfo.maxTimestamp >= tokenInfo.expiryTimestamp)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = {
+ client = createAdminClient
+ val timeout = -1
+
+ val createOptions = new
CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+ val token = client.createDelegationToken(createOptions).delegationToken.get
+ val tokenInfo = token.tokenInfo
+
+ assertEquals(maxLifeTime, tokenInfo.maxTimestamp -
tokenInfo.issueTimestamp)
+ assertTrue(tokenInfo.maxTimestamp >= tokenInfo.expiryTimestamp)
+
+ TestUtils.waitUntilTrue(() => brokers.forall(server =>
server.tokenCache.tokens.size == 1),
+ "Timed out waiting for token to propagate to all servers")
+
+ val expiredOptions = new
ExpireDelegationTokenOptions().expiryTimePeriodMs(tokenInfo.maxTimestamp + 1)
+ val expiredResult = client.expireDelegationToken(token.hmac,
expiredOptions)
+
+ assertEquals(tokenInfo.maxTimestamp, expiredResult.expiryTimestamp.get())
+ }
+
private def verifyCauseIsClusterAuth(e: Throwable): Unit =
assertEquals(classOf[ClusterAuthorizationException], e.getCause.getClass)
private def testAclCreateGetDelete(expectAuth: Boolean): Unit = {