This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a6062d08682 KAFKA-17137 Feat admin client it acl configs (#16648)
a6062d08682 is described below

commit a6062d08682b5459a904d0abd044be1c51b4f981
Author: Eric Chang <[email protected]>
AuthorDate: Sat Aug 31 12:29:39 2024 +0800

    KAFKA-17137 Feat admin client it acl configs (#16648)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../admin/ExpireDelegationTokenOptions.java        |  9 ++-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 51 +++++++++++++-
 .../kafka/api/SaslSslAdminIntegrationTest.scala    | 80 +++++++++++++++++++++-
 3 files changed, 135 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
index 3bf9489a05e..a3f1462f628 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
@@ -28,7 +28,14 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 public class ExpireDelegationTokenOptions extends 
AbstractOptions<ExpireDelegationTokenOptions> {
     private long expiryTimePeriodMs = -1L;
 
-    public ExpireDelegationTokenOptions expiryTimePeriodMs(long 
expiryTimePeriodMs) {
+    /**
+     * @param expiryTimePeriodMs the time period until we should expire this 
token.
+     * {@code expiryTimePeriodMs} &gt;= 0: the token will update the 
expiration timestamp to min(now + expiryTimePeriodMs, maxTimestamp)
+     * {@code expiryTimePeriodMs} &lt; 0: token will be expired immediately.
+     */
+    public ExpireDelegationTokenOptions expiryTimePeriodMs(
+        long expiryTimePeriodMs
+    ) {
         this.expiryTimePeriodMs = expiryTimePeriodMs;
         return this;
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 9f871279333..c3d0c4de80a 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -96,6 +96,50 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeConfigWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    try {
+      // Describe and broker
+      val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, 
brokers(1).config.brokerId.toString)
+      val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, 
brokers(2).config.brokerId.toString)
+      val configResources = Seq(brokerResource1, brokerResource2)
+
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        brokenClient.describeConfigs(configResources.asJava,new 
DescribeConfigsOptions().timeoutMs(0)).all().get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterConfigsWithOptionTimeoutMs(quorum: String): Unit = {
+    client = createAdminClient
+    val config = createConfig
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    try {
+      val alterLogLevelsEntries = Seq(
+        new ConfigEntry("kafka.controller.KafkaController", 
LogLevelConfig.INFO_LOG_LEVEL)
+      ).asJavaCollection
+
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        brokenClient.alterConfigs(
+        Map(brokerLoggerConfigResource -> new 
Config(alterLogLevelsEntries)).asJava,
+          new AlterConfigsOptions().timeoutMs(0)).all().get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally brokenClient.close(time.Duration.ZERO)
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
   def testCreatePartitionWithOptionRetryOnQuotaViolation(quorum: String): Unit 
= {
@@ -108,8 +152,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> 
clientId).asJava)
     val configEntries = 
Map(QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG -> 1.0, 
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 3.0)
-    client.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, 
configEntries.map {case (k, v) =>
-      new 
ClientQuotaAlteration.Op(k,v)}.asJavaCollection)).asJavaCollection).all.get
+    client.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, 
configEntries.map { case (k, v) =>
+      new ClientQuotaAlteration.Op(k, v)
+    }.asJavaCollection)).asJavaCollection).all.get
 
     TestUtils.waitUntilTrue(() => {
       // wait for our ClientQuotaEntity to be set
@@ -118,7 +163,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     val quotaEntities = 
client.describeClientQuotas(ClientQuotaFilter.all()).entities().get()
 
-    assertEquals(configEntries,quotaEntities.get(entity).asScala)
+    assertEquals(configEntries, quotaEntities.get(entity).asScala)
   }
 
   @ParameterizedTest
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index f08db242944..d7ba9430990 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -12,6 +12,7 @@
   */
 package kafka.api
 
+import java.time
 import kafka.security.JaasTestUtils
 import kafka.security.authorizer.AclAuthorizer
 import kafka.utils.TestUtils._
@@ -22,11 +23,12 @@ import org.apache.kafka.common.acl._
 import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, 
CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, IDEMPOTENT_WRITE}
 import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
 import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, 
TopicConfig}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
DelegationTokenExpiredException, DelegationTokenNotFoundException, 
InvalidRequestException, TopicAuthorizationException, 
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
DelegationTokenExpiredException, DelegationTokenNotFoundException, 
InvalidRequestException, TimeoutException, TopicAuthorizationException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.SecurityUtils
 import org.apache.kafka.common.security.token.delegation.DelegationToken
 import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, 
WILDCARD_PRINCIPAL_STRING}
 import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, 
ServerConfigs, ZkConfigs}
@@ -38,6 +40,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 import java.util
+import java.util.Collections
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.concurrent.ExecutionException
@@ -51,11 +54,14 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName
   val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME)
   var superUserAdmin: Admin = _
+  val secretKey = "secretKey"
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
+    // set this to use delegation token
+    
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
 secretKey)
     if (TestInfoUtils.isKRaft(testInfo)) {
       
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
kraftAuthorizerClassName)
       
this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
kraftAuthorizerClassName)
@@ -132,6 +138,78 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
   val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", 
PatternType.LITERAL),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
 
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    try {
+      val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
+      val exception = assertThrows(classOf[ExecutionException], () => {
+      brokenClient.createAcls(Collections.singleton(acl), new 
CreateAclsOptions().timeoutMs(0)).all().get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @Timeout(30)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = {
+    val config = createConfig
+    // this will cause timeout connecting to broker
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
s"localhost:${TestUtils.IncorrectBrokerPort}")
+    val brokenClient = Admin.create(config)
+
+    try {
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        brokenClient.deleteAcls(Collections.singleton(AclBindingFilter.ANY), 
new DeleteAclsOptions().timeoutMs(0)).all().get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally brokenClient.close(time.Duration.ZERO)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk","kraft"))
+  def testExpireDelegationTokenWithOptionExpireTimePeriodMs(quorum: String): 
Unit = {
+    client = createAdminClient
+    val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer"))
+
+    def generateTokenResult(maxLifeTimeMs: Int, expiryTimePeriodMs: Int, 
expectedTokenNum: Int): (CreateDelegationTokenResult, 
ExpireDelegationTokenResult) = {
+      val createResult = client.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer.asJava).maxlifeTimeMs(maxLifeTimeMs))
+      val tokenCreated = createResult.delegationToken.get
+      TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == expectedTokenNum),
+            "Timed out waiting for token to propagate to all servers")
+      val expireResult = client.expireDelegationToken(
+        tokenCreated.hmac(),
+        new 
ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs)
+      )
+      (createResult, expireResult)
+    }
+
+    try {
+      // Note that maxTimestamp = token created time + maxLifeTimeMs
+      val (createResult1, expireResult1) = generateTokenResult(10000, -1, 1)
+      // if expiryTimePeriodMs < 0, token will be expired immediately.
+      
assertTrue(createResult1.delegationToken().get().tokenInfo().maxTimestamp() > 
expireResult1.expiryTimestamp().get())
+
+      // expireDelegationToken will decrease the value of expiryTimestamp, 
since this token is not expired,
+      // expiryTimestamp will be set to min(now + expiryTimePeriodMs, 
maxTimestamp),
+      // in this case, maxTimestamp is smaller, so expiryTimestamp will not be 
modified
+      val (createResult2, expireResult2) = generateTokenResult(50000, 100000, 
1)
+      
assert(createResult2.delegationToken().get().tokenInfo().expiryTimestamp() == 
expireResult2.expiryTimestamp().get())
+
+      // since previous token is not expired yet, we need to set 
expectedTokenNum to 2
+      val (createResult3, expireResult3) = generateTokenResult(5000, 2000, 2)
+      
assert(createResult3.delegationToken().get().tokenInfo().expiryTimestamp() > 
expireResult3.expiryTimestamp().get())
+    } finally client.close(time.Duration.ZERO)
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
   def testAclOperations(quorum: String): Unit = {

Reply via email to