This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a6062d08682 KAFKA-17137 Feat admin client it acl configs (#16648)
a6062d08682 is described below
commit a6062d08682b5459a904d0abd044be1c51b4f981
Author: Eric Chang <[email protected]>
AuthorDate: Sat Aug 31 12:29:39 2024 +0800
KAFKA-17137 Feat admin client it acl configs (#16648)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../admin/ExpireDelegationTokenOptions.java | 9 ++-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 51 +++++++++++++-
.../kafka/api/SaslSslAdminIntegrationTest.scala | 80 +++++++++++++++++++++-
3 files changed, 135 insertions(+), 5 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
index 3bf9489a05e..a3f1462f628 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
@@ -28,7 +28,14 @@ import org.apache.kafka.common.annotation.InterfaceStability;
public class ExpireDelegationTokenOptions extends
AbstractOptions<ExpireDelegationTokenOptions> {
private long expiryTimePeriodMs = -1L;
- public ExpireDelegationTokenOptions expiryTimePeriodMs(long
expiryTimePeriodMs) {
+ /**
+ * @param expiryTimePeriodMs the time period until we should expire this
token.
+ * {@code expiryTimePeriodMs} >= 0: the token will update the
expiration timestamp to min(now + expiryTimePeriodMs, maxTimestamp)
+ * {@code expiryTimePeriodMs} < 0: token will be expired immediately.
+ */
+ public ExpireDelegationTokenOptions expiryTimePeriodMs(
+ long expiryTimePeriodMs
+ ) {
this.expiryTimePeriodMs = expiryTimePeriodMs;
return this;
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 9f871279333..c3d0c4de80a 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -96,6 +96,50 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeConfigWithOptionTimeoutMs(quorum: String): Unit = {
+ val config = createConfig
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
+
+ try {
+ // Describe and broker
+ val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER,
brokers(1).config.brokerId.toString)
+ val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER,
brokers(2).config.brokerId.toString)
+ val configResources = Seq(brokerResource1, brokerResource2)
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ brokenClient.describeConfigs(configResources.asJava,new
DescribeConfigsOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally brokenClient.close(time.Duration.ZERO)
+ }
+
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterConfigsWithOptionTimeoutMs(quorum: String): Unit = {
+ client = createAdminClient
+ val config = createConfig
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
+
+ try {
+ val alterLogLevelsEntries = Seq(
+ new ConfigEntry("kafka.controller.KafkaController",
LogLevelConfig.INFO_LOG_LEVEL)
+ ).asJavaCollection
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ brokenClient.alterConfigs(
+ Map(brokerLoggerConfigResource -> new
Config(alterLogLevelsEntries)).asJava,
+ new AlterConfigsOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally brokenClient.close(time.Duration.ZERO)
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testCreatePartitionWithOptionRetryOnQuotaViolation(quorum: String): Unit
= {
@@ -108,8 +152,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID ->
clientId).asJava)
val configEntries =
Map(QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG -> 1.0,
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 3.0)
- client.alterClientQuotas(Seq(new ClientQuotaAlteration(entity,
configEntries.map {case (k, v) =>
- new
ClientQuotaAlteration.Op(k,v)}.asJavaCollection)).asJavaCollection).all.get
+ client.alterClientQuotas(Seq(new ClientQuotaAlteration(entity,
configEntries.map { case (k, v) =>
+ new ClientQuotaAlteration.Op(k, v)
+ }.asJavaCollection)).asJavaCollection).all.get
TestUtils.waitUntilTrue(() => {
// wait for our ClientQuotaEntity to be set
@@ -118,7 +163,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val quotaEntities =
client.describeClientQuotas(ClientQuotaFilter.all()).entities().get()
- assertEquals(configEntries,quotaEntities.get(entity).asScala)
+ assertEquals(configEntries, quotaEntities.get(entity).asScala)
}
@ParameterizedTest
diff --git
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index f08db242944..d7ba9430990 100644
---
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -12,6 +12,7 @@
*/
package kafka.api
+import java.time
import kafka.security.JaasTestUtils
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils._
@@ -22,11 +23,12 @@ import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS,
CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, IDEMPOTENT_WRITE}
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs,
TopicConfig}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException,
DelegationTokenExpiredException, DelegationTokenNotFoundException,
InvalidRequestException, TopicAuthorizationException,
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException,
DelegationTokenExpiredException, DelegationTokenNotFoundException,
InvalidRequestException, TimeoutException, TopicAuthorizationException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.resource.PatternType.LITERAL
import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
import org.apache.kafka.common.resource.{PatternType, Resource,
ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST,
WILDCARD_PRINCIPAL_STRING}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs,
ServerConfigs, ZkConfigs}
@@ -38,6 +40,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util
+import java.util.Collections
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionException
@@ -51,11 +54,14 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
val kraftAuthorizerClassName = classOf[StandardAuthorizer].getName
val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
JaasTestUtils.KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME)
var superUserAdmin: Admin = _
+ val secretKey = "secretKey"
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile =
Some(TestUtils.tempFile("truststore", ".jks"))
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
+ // set this to use delegation token
+
this.serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
secretKey)
if (TestInfoUtils.isKRaft(testInfo)) {
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG,
kraftAuthorizerClassName)
this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG,
kraftAuthorizerClassName)
@@ -132,6 +138,78 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*",
PatternType.LITERAL),
new AccessControlEntry("User:*", "*", AclOperation.ALL,
AclPermissionType.ALLOW))
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAclOperationsWithOptionTimeoutMs(quorum: String): Unit = {
+ val config = createConfig
+ // this will cause timeout connecting to broker
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
+
+ try {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ brokenClient.createAcls(Collections.singleton(acl), new
CreateAclsOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally brokenClient.close(time.Duration.ZERO)
+ }
+
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteAclsWithOptionTimeoutMs(quorum: String): Unit = {
+ val config = createConfig
+ // this will cause timeout connecting to broker
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ val brokenClient = Admin.create(config)
+
+ try {
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ brokenClient.deleteAcls(Collections.singleton(AclBindingFilter.ANY),
new DeleteAclsOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally brokenClient.close(time.Duration.ZERO)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk","kraft"))
+ def testExpireDelegationTokenWithOptionExpireTimePeriodMs(quorum: String):
Unit = {
+ client = createAdminClient
+ val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer"))
+
+ def generateTokenResult(maxLifeTimeMs: Int, expiryTimePeriodMs: Int,
expectedTokenNum: Int): (CreateDelegationTokenResult,
ExpireDelegationTokenResult) = {
+ val createResult = client.createDelegationToken(new
CreateDelegationTokenOptions().renewers(renewer.asJava).maxlifeTimeMs(maxLifeTimeMs))
+ val tokenCreated = createResult.delegationToken.get
+ TestUtils.waitUntilTrue(() => brokers.forall(server =>
server.tokenCache.tokens().size() == expectedTokenNum),
+ "Timed out waiting for token to propagate to all servers")
+ val expireResult = client.expireDelegationToken(
+ tokenCreated.hmac(),
+ new
ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs)
+ )
+ (createResult, expireResult)
+ }
+
+ try {
+ // Note that maxTimestamp = token created time + maxLifeTimeMs
+ val (createResult1, expireResult1) = generateTokenResult(10000, -1, 1)
+ // if expiryTimePeriodMs < 0, token will be expired immediately.
+
assertTrue(createResult1.delegationToken().get().tokenInfo().maxTimestamp() >
expireResult1.expiryTimestamp().get())
+
+ // expireDelegationToken will decrease the value of expiryTimestamp,
since this token is not expired,
+ // expiryTimestamp will be set to min(now + expiryTimePeriodMs,
maxTimestamp),
+ // in this case, maxTimestamp is smaller, so expiryTimestamp will not be
modified
+ val (createResult2, expireResult2) = generateTokenResult(50000, 100000,
1)
+
assert(createResult2.delegationToken().get().tokenInfo().expiryTimestamp() ==
expireResult2.expiryTimestamp().get())
+
+ // since previous token is not expired yet, we need to set
expectedTokenNum to 2
+ val (createResult3, expireResult3) = generateTokenResult(5000, 2000, 2)
+
assert(createResult3.delegationToken().get().tokenInfo().expiryTimestamp() >
expireResult3.expiryTimestamp().get())
+ } finally client.close(time.Duration.ZERO)
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAclOperations(quorum: String): Unit = {