This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f4d38b996b6 KAFKA-20564 Move AuthorizerTest to server (#22261)
f4d38b996b6 is described below
commit f4d38b996b688959043623162568c2f650a79463
Author: Mickael Maison <[email protected]>
AuthorDate: Fri May 15 15:08:41 2026 +0200
KAFKA-20564 Move AuthorizerTest to server (#22261)
Merge BaseAuthorizerTest and AuthorizerTest to move them to the server
module
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
build.gradle | 1 +
checkstyle/import-control-server-common.xml | 1 +
checkstyle/import-control-server.xml | 1 +
.../kafka/security/authorizer/AuthorizerTest.scala | 682 -------------
.../security/authorizer/BaseAuthorizerTest.scala | 373 -------
.../apache/kafka/server/util/ServerTestUtils.java | 35 +
.../kafka/security/authorizer/AuthorizerTest.java | 1069 ++++++++++++++++++++
7 files changed, 1107 insertions(+), 1055 deletions(-)
diff --git a/build.gradle b/build.gradle
index 9f1329dccff..9cee79e0610 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1035,6 +1035,7 @@ project(':server') {
testImplementation project(':test-common:test-common-runtime')
testImplementation testFixtures(project(':storage:storage-api'))
testImplementation testFixtures(project(':server-common'))
+ testImplementation testFixtures(project(':metadata'))
testFixturesImplementation project(':clients')
testFixturesImplementation testFixtures(project(':clients'))
diff --git a/checkstyle/import-control-server-common.xml
b/checkstyle/import-control-server-common.xml
index fd44a4a3f4b..7d075dd71d7 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -134,6 +134,7 @@
<allow
class="org.apache.kafka.server.util.TopicFilter.IncludeList" />
<allow class="org.apache.kafka.test.TestUtils" />
+ <allow class="org.apache.kafka.server.authorizer.Authorizer" />
<!-- ServerTestUtils uses yammer metrics for test cleanup -->
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="com.yammer.metrics.core" />
diff --git a/checkstyle/import-control-server.xml
b/checkstyle/import-control-server.xml
index 9f7cc661a66..db528b8c717 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -114,6 +114,7 @@
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.server.authorizer" />
+ <allow class="org.apache.kafka.controller.MockAclMutator" />
</subpackage>
<subpackage name="network">
diff --git
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
deleted file mode 100644
index 14f4936c8a8..00000000000
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
+++ /dev/null
@@ -1,682 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.authorizer
-
-import kafka.server.{KafkaConfig, QuorumTestHarness}
-import kafka.utils.TestUtils
-import org.apache.kafka.common.Endpoint
-import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.metrics.{Metrics, PluginMetrics}
-import org.apache.kafka.common.metrics.internals.PluginMetricsImpl
-import org.apache.kafka.common.requests.RequestContext
-import org.apache.kafka.common.resource.PatternType.{LITERAL, MATCH, PREFIXED}
-import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
-import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
-import org.apache.kafka.common.resource.ResourceType._
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourcePatternFilter, ResourceType}
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.common.utils.internals.{SecurityUtils =>
JSecurityUtils}
-import org.apache.kafka.controller.MockAclMutator
-import org.apache.kafka.metadata.authorizer.{AuthorizerTestServerInfo,
StandardAuthorizer}
-import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST,
WILDCARD_PRINCIPAL_STRING}
-import org.apache.kafka.server.authorizer._
-import org.apache.kafka.server.util.ServerTestUtils
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
-import org.junit.jupiter.api.Test
-
-import java.net.InetAddress
-import java.util
-import java.util.{Properties, UUID}
-import scala.jdk.CollectionConverters._
-
-class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
-
- private final val PLAINTEXT = new Endpoint("PLAINTEXT",
SecurityProtocol.PLAINTEXT, "127.0.0.1", 9020)
-
- private val allowReadAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING,
WILDCARD_HOST, READ, ALLOW)
- private val allowWriteAcl = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW)
- private val denyReadAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING,
WILDCARD_HOST, READ, DENY)
-
- private val wildCardResource = new ResourcePattern(TOPIC, WILDCARD_RESOURCE,
LITERAL)
- private val prefixedResource = new ResourcePattern(TOPIC, "foo", PREFIXED)
- private val clusterResource = new ResourcePattern(CLUSTER, CLUSTER_NAME,
LITERAL)
- private val wildcardPrincipal =
JSecurityUtils.parseKafkaPrincipal(WILDCARD_PRINCIPAL_STRING)
-
- private var authorizer1: Authorizer = _
- private var authorizer2: Authorizer = _
-
- private var _testInfo: TestInfo = _
-
- class CustomPrincipal(principalType: String, name: String) extends
KafkaPrincipal(principalType, name) {
- override def equals(o: scala.Any): Boolean = false
- }
-
- override def authorizer: Authorizer = authorizer1
-
- def testInfo: TestInfo = _testInfo
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- super.setUp(testInfo)
- _testInfo = testInfo
-
- val props = properties
- config = KafkaConfig.fromProps(props)
- authorizer1 = createAuthorizer()
- configureAuthorizer(authorizer1, config.originals, new
PluginMetricsImpl(new Metrics(), util.Map.of()))
- authorizer2 = createAuthorizer()
- configureAuthorizer(authorizer2, config.originals, new
PluginMetricsImpl(new Metrics(), util.Map.of()))
- resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
- }
-
- def properties: Properties = {
- val props = TestUtils.createBrokerConfig(0)
- props.put(StandardAuthorizer.SUPER_USERS_CONFIG, superUsers)
- props
- }
-
- @AfterEach
- override def tearDown(): Unit = {
- authorizer1.close()
- authorizer2.close()
- ServerTestUtils.clearYammerMetrics()
- super.tearDown()
- }
-
- @Test
- def testAuthorizeThrowsOnNonLiteralResource(): Unit = {
- assertThrows(classOf[IllegalArgumentException], () =>
authorize(authorizer1, requestContext, READ,
- new ResourcePattern(TOPIC, "something", PREFIXED)))
- }
-
- @Test
- def testAuthorizeWithEmptyResourceName(): Unit = {
- assertFalse(authorize(authorizer1, requestContext, READ, new
ResourcePattern(GROUP, "", LITERAL)))
- addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP,
WILDCARD_RESOURCE, LITERAL))
- assertTrue(authorize(authorizer1, requestContext, READ, new
ResourcePattern(GROUP, "", LITERAL)))
- }
-
- // Authorizing the empty resource is not supported because empty resource
name is invalid.
- @Test
- def testEmptyAclThrowsException(): Unit = {
- assertThrows(classOf[ApiException],
- () => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP,
"", LITERAL)))
- }
-
- @Test
- def testTopicAcl(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
- val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
- val host1 = InetAddress.getByName("192.168.1.1")
- val host2 = InetAddress.getByName("192.168.1.2")
-
- //user1 has READ access from host1 and host2.
- val acl1 = new AccessControlEntry(user1.toString, host1.getHostAddress,
READ, ALLOW)
- val acl2 = new AccessControlEntry(user1.toString, host2.getHostAddress,
READ, ALLOW)
-
- //user1 does not have READ access from host1.
- val acl3 = new AccessControlEntry(user1.toString, host1.getHostAddress,
READ, DENY)
-
- //user1 has WRITE access from host1 only.
- val acl4 = new AccessControlEntry(user1.toString, host1.getHostAddress,
WRITE, ALLOW)
-
- //user1 has DESCRIBE access from all hosts.
- val acl5 = new AccessControlEntry(user1.toString, WILDCARD_HOST, DESCRIBE,
ALLOW)
-
- //user2 has READ access from all hosts.
- val acl6 = new AccessControlEntry(user2.toString, WILDCARD_HOST, READ,
ALLOW)
-
- //user3 has WRITE access from all hosts.
- val acl7 = new AccessControlEntry(user3.toString, WILDCARD_HOST, WRITE,
ALLOW)
-
- val acls = Set(acl1, acl2, acl3, acl4, acl5, acl6, acl7)
-
- changeAclAndVerify(Set.empty, acls, Set.empty)
-
- val host1Context = newRequestContext(user1, host1)
- val host2Context = newRequestContext(user1, host2)
-
- assertTrue(authorize(authorizer1, host2Context, READ, resource), "User1
should have READ access from host2")
- assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1
should not have READ access from host1 due to denyAcl")
- assertTrue(authorize(authorizer1, host1Context, WRITE, resource), "User1
should have WRITE access from host1")
- assertFalse(authorize(authorizer1, host2Context, WRITE, resource), "User1
should not have WRITE access from host2 as no allow acl is defined")
- assertTrue(authorize(authorizer1, host1Context, DESCRIBE, resource),
"User1 should not have DESCRIBE access from host1")
- assertTrue(authorize(authorizer1, host2Context, DESCRIBE, resource),
"User1 should have DESCRIBE access from host2")
- assertFalse(authorize(authorizer1, host1Context, ALTER, resource), "User1
should not have edit access from host1")
- assertFalse(authorize(authorizer1, host2Context, ALTER, resource), "User1
should not have edit access from host2")
-
- //test if user has READ or WRITE access they also get DESCRIBE access
- val user2Context = newRequestContext(user2, host1)
- val user3Context = newRequestContext(user3, host1)
- assertTrue(authorize(authorizer1, user2Context, DESCRIBE, resource),
"User2 should have DESCRIBE access from host1")
- assertTrue(authorize(authorizer1, user3Context, DESCRIBE, resource),
"User3 should have DESCRIBE access from host2")
- assertTrue(authorize(authorizer1, user2Context, READ, resource), "User2
should have READ access from host1")
- assertTrue(authorize(authorizer1, user3Context, WRITE, resource), "User3
should have WRITE access from host2")
- }
-
- /**
- * CustomPrincipals should be compared with their principal type and name
- */
- @Test
- def testAllowAccessWithCustomPrincipal(): Unit = {
- val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE,
username)
- val host1 = InetAddress.getByName("192.168.1.1")
- val host2 = InetAddress.getByName("192.168.1.2")
-
- // user has READ access from host2 but not from host1
- val acl1 = new AccessControlEntry(user.toString, host1.getHostAddress,
READ, DENY)
- val acl2 = new AccessControlEntry(user.toString, host2.getHostAddress,
READ, ALLOW)
- val acls = Set(acl1, acl2)
- changeAclAndVerify(Set.empty, acls, Set.empty)
-
- val host1Context = newRequestContext(customUserPrincipal, host1)
- val host2Context = newRequestContext(customUserPrincipal, host2)
-
- assertTrue(authorize(authorizer1, host2Context, READ, resource), "User1
should have READ access from host2")
- assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1
should not have READ access from host1 due to denyAcl")
- }
-
- @Test
- def testDenyTakesPrecedence(): Unit = {
- val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host = InetAddress.getByName("192.168.2.1")
- val session = newRequestContext(user, host)
-
- val allowAll = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING,
WILDCARD_HOST, AclOperation.ALL, ALLOW)
- val denyAcl = new AccessControlEntry(user.toString, host.getHostAddress,
AclOperation.ALL, DENY)
- val acls = Set(allowAll, denyAcl)
-
- changeAclAndVerify(Set.empty, acls, Set.empty)
-
- assertFalse(authorize(authorizer1, session, READ, resource), "deny should
take precedence over allow.")
- }
-
- @Test
- def testAllowAllAccess(): Unit = {
- val allowAllAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING,
WILDCARD_HOST, AclOperation.ALL, ALLOW)
-
- changeAclAndVerify(Set.empty, Set(allowAllAcl), Set.empty)
-
- val context = newRequestContext(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"),
InetAddress.getByName("192.0.4.4"))
- assertTrue(authorize(authorizer1, context, READ, resource), "allow all acl
should allow access to all.")
- }
-
- @Test
- def testSuperUserHasAccess(): Unit = {
- val denyAllAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING,
WILDCARD_HOST, AclOperation.ALL, DENY)
-
- changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty)
-
- val session1 = newRequestContext(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"),
InetAddress.getByName("192.0.4.4"))
- val session2 = newRequestContext(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"),
InetAddress.getByName("192.0.4.4"))
-
- assertTrue(authorize(authorizer1, session1, READ, resource), "superuser
always has access, no matter what acls.")
- assertTrue(authorize(authorizer1, session2, READ, resource), "superuser
always has access, no matter what acls.")
- }
-
- /**
- * CustomPrincipals should be compared with their principal type and name
- */
- @Test
- def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
- val denyAllAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING,
WILDCARD_HOST, AclOperation.ALL, DENY)
- changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty)
-
- val session = newRequestContext(new
CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"),
InetAddress.getByName("192.0.4.4"))
-
- assertTrue(authorize(authorizer1, session, READ, resource), "superuser
with custom principal always has access, no matter what acls.")
- }
-
- @Test
- def testWildCardAcls(): Unit = {
- assertFalse(authorize(authorizer1, requestContext, READ, resource), "when
acls = [], authorizer should fail close.")
-
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host1 = InetAddress.getByName("192.168.3.1")
- val readAcl = new AccessControlEntry(user1.toString, host1.getHostAddress,
READ, ALLOW)
-
- val acls = changeAclAndVerify(Set.empty, Set(readAcl), Set.empty,
wildCardResource)
-
- val host1Context = newRequestContext(user1, host1)
- assertTrue(authorize(authorizer1, host1Context, READ, resource), "User1
should have READ access from host1")
-
- //allow WRITE to specific topic.
- val writeAcl = new AccessControlEntry(user1.toString,
host1.getHostAddress, WRITE, ALLOW)
- changeAclAndVerify(Set.empty, Set(writeAcl), Set.empty)
-
- //deny WRITE to wild card topic.
- val denyWriteOnWildCardResourceAcl = new
AccessControlEntry(user1.toString, host1.getHostAddress, WRITE, DENY)
- changeAclAndVerify(acls, Set(denyWriteOnWildCardResourceAcl), Set.empty,
wildCardResource)
-
- assertFalse(authorize(authorizer1, host1Context, WRITE, resource), "User1
should not have WRITE access from host1")
- }
-
- @Test
- def testNoAclFound(): Unit = {
- assertFalse(authorize(authorizer1, requestContext, READ, resource), "when
acls = [], authorizer should deny op.")
- }
-
- @Test
- def testNoAclFoundOverride(): Unit = {
- val props = properties
- props.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG,
"true")
-
- val cfg = KafkaConfig.fromProps(props)
- val testAuthorizer = createAuthorizer()
- try {
- configureAuthorizer(testAuthorizer, cfg.originals, new
PluginMetricsImpl(new Metrics(), util.Map.of()))
- assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
- "when acls = null or [], authorizer should allow op with
allow.everyone = true.")
- } finally {
- testAuthorizer.close()
- }
- }
-
- @Test
- def testAclConfigWithWhitespace(): Unit = {
- val props = properties
- props.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "
true")
- // replace all property values with leading & trailing whitespaces
- props.replaceAll((_, v) => " " + v + " ")
- val cfg = KafkaConfig.fromProps(props)
- val testAuthorizer = createAuthorizer()
- try {
- configureAuthorizer(testAuthorizer, cfg.originals, new
PluginMetricsImpl(new Metrics(), util.Map.of()))
- assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
- "when acls = null or [], authorizer should allow op with
allow.everyone = true.")
- } finally {
- testAuthorizer.close()
- }
- }
-
- @Test
- def testAclManagementAPIs(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val host1 = "host1"
- val host2 = "host2"
-
- val acl1 = new AccessControlEntry(user1.toString, host1, READ, ALLOW)
- val acl2 = new AccessControlEntry(user1.toString, host1, WRITE, ALLOW)
- val acl3 = new AccessControlEntry(user2.toString, host2, READ, ALLOW)
- val acl4 = new AccessControlEntry(user2.toString, host2, WRITE, ALLOW)
-
- var acls = changeAclAndVerify(Set.empty, Set(acl1, acl2, acl3, acl4),
Set.empty)
-
- //test addAcl is additive
- val acl5 = new AccessControlEntry(user2.toString, WILDCARD_HOST, READ,
ALLOW)
- acls = changeAclAndVerify(acls, Set(acl5), Set.empty)
-
- //test get by principal name.
- TestUtils.waitUntilTrue(() => Set(acl1, acl2).map(acl => new
AclBinding(resource, acl)) == getAcls(authorizer1, user1),
- "changes not propagated in timeout period")
- TestUtils.waitUntilTrue(() => Set(acl3, acl4, acl5).map(acl => new
AclBinding(resource, acl)) == getAcls(authorizer1, user2),
- "changes not propagated in timeout period")
-
- val resourceToAcls = Map[ResourcePattern, Set[AccessControlEntry]](
- new ResourcePattern(TOPIC, WILDCARD_RESOURCE, LITERAL) -> Set(new
AccessControlEntry(user2.toString, WILDCARD_HOST, READ, ALLOW)),
- new ResourcePattern(CLUSTER, WILDCARD_RESOURCE, LITERAL) -> Set(new
AccessControlEntry(user2.toString, host1, READ, ALLOW)),
- new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL) -> acls,
- new ResourcePattern(GROUP, "test-ConsumerGroup", LITERAL) -> acls
- )
-
- resourceToAcls foreach { case (key, value) =>
changeAclAndVerify(Set.empty, value, Set.empty, key) }
- val expectedAcls = (resourceToAcls + (resource -> acls)).flatMap {
- case (res, resAcls) => resAcls.map { acl => new AclBinding(res, acl) }
- }.toSet
- TestUtils.waitUntilTrue(() => expectedAcls == getAcls(authorizer1),
"changes not propagated in timeout period.")
-
- //test remove acl from existing acls.
- acls = changeAclAndVerify(acls, Set.empty, Set(acl1, acl5))
-
- //test remove all acls for resource
- removeAcls(authorizer1, Set.empty, resource)
- TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1,
resource, AccessControlEntryFilter.ANY)
-
- acls = changeAclAndVerify(Set.empty, Set(acl1), Set.empty)
- changeAclAndVerify(acls, Set.empty, acls)
- }
-
- @Test
- def testLocalConcurrentModificationOfResourceAcls(): Unit = {
- val commonResource = new ResourcePattern(TOPIC, "test", LITERAL)
-
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val acl1 = new AccessControlEntry(user1.toString, WILDCARD_HOST, READ,
ALLOW)
-
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val acl2 = new AccessControlEntry(user2.toString, WILDCARD_HOST, READ,
DENY)
-
- addAcls(authorizer1, Set(acl1), commonResource)
- addAcls(authorizer1, Set(acl2), commonResource)
-
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource,
AccessControlEntryFilter.ANY)
- }
-
- /**
- * Test ACL inheritance, as described in
#{org.apache.kafka.common.acl.AclOperation}
- */
- @Test
- def testAclInheritance(): Unit = {
- testImplicationsOfAllow(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE,
ALTER, DESCRIBE,
- CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE,
CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT))
- testImplicationsOfDeny(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE,
ALTER, DESCRIBE,
- CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE,
CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT))
- testImplicationsOfAllow(READ, Set(DESCRIBE))
- testImplicationsOfAllow(WRITE, Set(DESCRIBE))
- testImplicationsOfAllow(DELETE, Set(DESCRIBE))
- testImplicationsOfAllow(ALTER, Set(DESCRIBE))
- testImplicationsOfDeny(DESCRIBE, Set())
- testImplicationsOfAllow(ALTER_CONFIGS, Set(DESCRIBE_CONFIGS))
- testImplicationsOfDeny(DESCRIBE_CONFIGS, Set())
- }
-
- private def testImplicationsOfAllow(parentOp: AclOperation, allowedOps:
Set[AclOperation]): Unit = {
- val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host = InetAddress.getByName("192.168.3.1")
- val hostContext = newRequestContext(user, host)
- val acl = new AccessControlEntry(user.toString, WILDCARD_HOST, parentOp,
ALLOW)
- addAcls(authorizer1, Set(acl), clusterResource)
- AclOperation.values.filter(validOp).foreach { op =>
- val authorized = authorize(authorizer1, hostContext, op, clusterResource)
- if (allowedOps.contains(op) || op == parentOp)
- assertTrue(authorized, s"ALLOW $parentOp should imply ALLOW $op")
- else
- assertFalse(authorized, s"ALLOW $parentOp should not imply ALLOW $op")
- }
- removeAcls(authorizer1, Set(acl), clusterResource)
- }
-
- private def testImplicationsOfDeny(parentOp: AclOperation, deniedOps:
Set[AclOperation]): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host1 = InetAddress.getByName("192.168.3.1")
- val host1Context = newRequestContext(user1, host1)
- val acls = Set(new AccessControlEntry(user1.toString, WILDCARD_HOST,
parentOp, DENY),
- new AccessControlEntry(user1.toString, WILDCARD_HOST, AclOperation.ALL,
ALLOW))
- addAcls(authorizer1, acls, clusterResource)
- AclOperation.values.filter(validOp).foreach { op =>
- val authorized = authorize(authorizer1, host1Context, op,
clusterResource)
- if (deniedOps.contains(op) || op == parentOp)
- assertFalse(authorized, s"DENY $parentOp should imply DENY $op")
- else
- assertTrue(authorized, s"DENY $parentOp should not imply DENY $op")
- }
- removeAcls(authorizer1, acls, clusterResource)
- }
-
- @Test
- def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
- addAcls(authorizer1, Set(allowReadAcl), wildCardResource)
-
- assertTrue(authorize(authorizer1, requestContext, READ, resource))
- }
-
- @Test
- def testDeleteAclOnWildcardResource(): Unit = {
- addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource)
-
- removeAcls(authorizer1, Set(allowReadAcl), wildCardResource)
-
- assertEquals(Set(allowWriteAcl), getAcls(authorizer1, wildCardResource))
- }
-
- @Test
- def testDeleteAllAclOnWildcardResource(): Unit = {
- addAcls(authorizer1, Set(allowReadAcl), wildCardResource)
-
- removeAcls(authorizer1, Set.empty, wildCardResource)
-
- assertEquals(Set.empty, getAcls(authorizer1))
- }
-
- @Test
- def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
- addAcls(authorizer1, Set(allowReadAcl), prefixedResource)
-
- assertTrue(authorize(authorizer1, requestContext, READ, resource))
- }
-
- @Test
- def testDeleteAclOnPrefixedResource(): Unit = {
- addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
-
- removeAcls(authorizer1, Set(allowReadAcl), prefixedResource)
-
- assertEquals(Set(allowWriteAcl), getAcls(authorizer1, prefixedResource))
- }
-
- @Test
- def testDeleteAllAclOnPrefixedResource(): Unit = {
- addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
-
- removeAcls(authorizer1, Set.empty, prefixedResource)
-
- assertEquals(Set.empty, getAcls(authorizer1))
- }
-
- @Test
- def testAddAclsOnLiteralResource(): Unit = {
- addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), resource)
- addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), resource)
-
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl),
getAcls(authorizer1, resource))
- assertEquals(Set.empty, getAcls(authorizer1, wildCardResource))
- assertEquals(Set.empty, getAcls(authorizer1, prefixedResource))
- }
-
- @Test
- def testAddAclsOnWildcardResource(): Unit = {
- addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource)
- addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), wildCardResource)
-
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl),
getAcls(authorizer1, wildCardResource))
- assertEquals(Set.empty, getAcls(authorizer1, resource))
- assertEquals(Set.empty, getAcls(authorizer1, prefixedResource))
- }
-
- @Test
- def testAddAclsOnPrefixedResource(): Unit = {
- addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
- addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), prefixedResource)
-
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl),
getAcls(authorizer1, prefixedResource))
- assertEquals(Set.empty, getAcls(authorizer1, wildCardResource))
- assertEquals(Set.empty, getAcls(authorizer1, resource))
- }
-
- @Test
- def testAuthorizeWithPrefixedResource(): Unit = {
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC,
"a_other", LITERAL))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC,
"a_other", PREFIXED))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" +
UUID.randomUUID(), PREFIXED))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" +
UUID.randomUUID(), PREFIXED))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" +
UUID.randomUUID() + "-zzz", PREFIXED))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fooo-"
+ UUID.randomUUID(), PREFIXED))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fo-" +
UUID.randomUUID(), PREFIXED))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fop-" +
UUID.randomUUID(), PREFIXED))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-" +
UUID.randomUUID(), PREFIXED))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-",
PREFIXED))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC,
"z_other", PREFIXED))
- addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC,
"z_other", LITERAL))
-
- addAcls(authorizer1, Set(allowReadAcl), prefixedResource)
-
- assertTrue(authorize(authorizer1, requestContext, READ, resource))
- }
-
- @Test
- def testSingleCharacterResourceAcls(): Unit = {
- addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "f",
LITERAL))
- assertTrue(authorize(authorizer1, requestContext, READ, new
ResourcePattern(TOPIC, "f", LITERAL)))
- assertFalse(authorize(authorizer1, requestContext, READ, new
ResourcePattern(TOPIC, "foo", LITERAL)))
-
- addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "_",
PREFIXED))
- assertTrue(authorize(authorizer1, requestContext, READ, new
ResourcePattern(TOPIC, "_foo", LITERAL)))
- assertTrue(authorize(authorizer1, requestContext, READ, new
ResourcePattern(TOPIC, "_", LITERAL)))
- assertFalse(authorize(authorizer1, requestContext, READ, new
ResourcePattern(TOPIC, "foo_", LITERAL)))
- }
-
- @Test
- def testGetAclsPrincipal(): Unit = {
- val aclOnSpecificPrincipal = new AccessControlEntry(principal.toString,
WILDCARD_HOST, WRITE, ALLOW)
- addAcls(authorizer1, Set(aclOnSpecificPrincipal), resource)
-
- assertEquals(0,
- getAcls(authorizer1, wildcardPrincipal).size, "acl on specific should
not be returned for wildcard request")
- assertEquals(1,
- getAcls(authorizer1, principal).size, "acl on specific should be
returned for specific request")
- assertEquals(1,
- getAcls(authorizer1, new KafkaPrincipal(principal.getPrincipalType,
principal.getName)).size, "acl on specific should be returned for different
principal instance")
-
- removeAcls(authorizer1, Set.empty, resource)
- val aclOnWildcardPrincipal = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW)
- addAcls(authorizer1, Set(aclOnWildcardPrincipal), resource)
-
- assertEquals(1, getAcls(authorizer1, wildcardPrincipal).size, "acl on
wildcard should be returned for wildcard request")
- assertEquals(0, getAcls(authorizer1, principal).size, "acl on wildcard
should not be returned for specific request")
- }
-
- @Test
- def testAclsFilter(): Unit = {
- val resource1 = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(),
LITERAL)
- val resource2 = new ResourcePattern(TOPIC, "bar-" + UUID.randomUUID(),
LITERAL)
- val prefixedResource = new ResourcePattern(TOPIC, "bar-", PREFIXED)
-
- val acl1 = new AclBinding(resource1, new
AccessControlEntry(principal.toString, WILDCARD_HOST, READ, ALLOW))
- val acl2 = new AclBinding(resource1, new
AccessControlEntry(principal.toString, "192.168.0.1", WRITE, ALLOW))
- val acl3 = new AclBinding(resource2, new
AccessControlEntry(principal.toString, WILDCARD_HOST, DESCRIBE, ALLOW))
- val acl4 = new AclBinding(prefixedResource, new
AccessControlEntry(wildcardPrincipal.toString, WILDCARD_HOST, READ, ALLOW))
-
- authorizer1.createAcls(requestContext, util.List.of(acl1, acl2, acl3,
acl4))
- assertEquals(Set(acl1, acl2, acl3, acl4),
authorizer1.acls(AclBindingFilter.ANY).asScala.toSet)
- assertEquals(Set(acl1, acl2), authorizer1.acls(new
AclBindingFilter(resource1.toFilter,
AccessControlEntryFilter.ANY)).asScala.toSet)
- assertEquals(Set(acl4), authorizer1.acls(new
AclBindingFilter(prefixedResource.toFilter,
AccessControlEntryFilter.ANY)).asScala.toSet)
- val matchingFilter = new AclBindingFilter(new
ResourcePatternFilter(ResourceType.ANY, resource2.name, MATCH),
AccessControlEntryFilter.ANY)
- assertEquals(Set(acl3, acl4),
authorizer1.acls(matchingFilter).asScala.toSet)
-
- val filters = List(matchingFilter,
- acl1.toFilter,
- new AclBindingFilter(resource2.toFilter, AccessControlEntryFilter.ANY),
- new AclBindingFilter(new ResourcePatternFilter(TOPIC, "baz",
PatternType.ANY), AccessControlEntryFilter.ANY))
- val deleteResults = authorizer1.deleteAcls(requestContext,
filters.asJava).asScala.map(_.toCompletableFuture.get)
- assertEquals(List.empty, deleteResults.filter(_.exception.isPresent))
- filters.indices.foreach { i =>
- assertEquals(Set.empty,
deleteResults(i).aclBindingDeleteResults.asScala.toSet.filter(_.exception.isPresent))
- }
- assertEquals(Set(acl3, acl4),
deleteResults(0).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
- assertEquals(Set(acl1),
deleteResults(1).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
- // standard authorizer first finds the acls that match filters and then
delete them.
- // So filters[2] will match acl3 even though it is also matching
filters[0] and will be deleted by it
- assertEquals(Set(acl3),
deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
- assertEquals(Set.empty,
deleteResults(3).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
- }
-
- @Test
- def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = {
- val props = properties
- props.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG,
"true")
-
- val cfg = KafkaConfig.fromProps(props)
- val authorizer: Authorizer = createAuthorizer()
- try {
- configureAuthorizer(authorizer, cfg.originals, new PluginMetricsImpl(new
Metrics(), util.Map.of()))
- assertTrue(authorizeByResourceType(authorizer, requestContext, READ,
resource.resourceType()),
- "If allow.everyone.if.no.acl.found = true, caller should have read
access to at least one topic")
- assertTrue(authorizeByResourceType(authorizer, requestContext, WRITE,
resource.resourceType()),
- "If allow.everyone.if.no.acl.found = true, caller should have write
access to at least one topic")
- } finally {
- authorizer.close()
- }
- }
-
- private def changeAclAndVerify(originalAcls: Set[AccessControlEntry],
- addedAcls: Set[AccessControlEntry],
- removedAcls: Set[AccessControlEntry],
- resource: ResourcePattern = resource):
Set[AccessControlEntry] = {
- var acls = originalAcls
-
- if (addedAcls.nonEmpty) {
- addAcls(authorizer1, addedAcls, resource)
- acls ++= addedAcls
- }
-
- if (removedAcls.nonEmpty) {
- removeAcls(authorizer1, removedAcls, resource)
- acls --= removedAcls
- }
-
- TestUtils.waitAndVerifyAcls(acls, authorizer1, resource,
AccessControlEntryFilter.ANY)
-
- acls
- }
-
- private def authorize(authorizer: Authorizer, requestContext:
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
- val action = new Action(operation, resource, 1, true, true)
- authorizer.authorize(requestContext, util.List.of(action)).asScala.head ==
AuthorizationResult.ALLOWED
- }
-
- private def getAcls(authorizer: Authorizer, resourcePattern:
ResourcePattern): Set[AccessControlEntry] = {
- val acls = authorizer.acls(new AclBindingFilter(resourcePattern.toFilter,
AccessControlEntryFilter.ANY)).asScala.toSet
- acls.map(_.entry)
- }
-
- private def getAcls(authorizer: Authorizer, principal: KafkaPrincipal):
Set[AclBinding] = {
- val filter = new AclBindingFilter(ResourcePatternFilter.ANY,
- new AccessControlEntryFilter(principal.toString, null, AclOperation.ANY,
AclPermissionType.ANY))
- authorizer.acls(filter).asScala.toSet
- }
-
- private def getAcls(authorizer: Authorizer): Set[AclBinding] = {
- authorizer.acls(AclBindingFilter.ANY).asScala.toSet
- }
-
- private def validOp(op: AclOperation): Boolean = {
- op != AclOperation.ANY && op != AclOperation.UNKNOWN
- }
-
- def createAuthorizer(): Authorizer = {
- new StandardAuthorizer
- }
-
- def configureAuthorizer(authorizer: Authorizer,
- configs: util.Map[String, AnyRef],
- pluginMetrics: PluginMetrics): Unit = {
- configureStandardAuthorizer(authorizer.asInstanceOf[StandardAuthorizer],
configs, pluginMetrics)
- }
-
- def configureStandardAuthorizer(standardAuthorizer: StandardAuthorizer,
- configs: util.Map[String, AnyRef],
- pluginMetrics: PluginMetrics): Unit = {
- standardAuthorizer.configure(configs)
- standardAuthorizer.withPluginMetrics(pluginMetrics)
- initializeStandardAuthorizer(standardAuthorizer, new
AuthorizerTestServerInfo(util.List.of(PLAINTEXT)))
- }
-
- def initializeStandardAuthorizer(standardAuthorizer: StandardAuthorizer,
- serverInfo: AuthorizerServerInfo): Unit = {
- val aclMutator = new MockAclMutator(standardAuthorizer)
- standardAuthorizer.start(serverInfo)
- standardAuthorizer.setAclMutator(aclMutator)
- standardAuthorizer.completeInitialLoad()
- }
-}
diff --git
a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
deleted file mode 100644
index c7726ff5245..00000000000
---
a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
+++ /dev/null
@@ -1,373 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.authorizer
-
-import java.net.InetAddress
-import java.util.UUID
-import kafka.server.KafkaConfig
-import org.apache.kafka.common.acl.AclOperation.{ALL, READ, WRITE}
-import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation}
-import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
-import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
-import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC,
TRANSACTIONAL_ID}
-import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST,
WILDCARD_PRINCIPAL_STRING}
-import org.apache.kafka.server.authorizer.{AuthorizationResult, Authorizer}
-import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
-import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-
-trait BaseAuthorizerTest {
-
- def authorizer: Authorizer
-
- val superUsers = "User:superuser1; User:superuser2"
- val username = "alice"
- val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val requestContext: RequestContext = newRequestContext(principal,
InetAddress.getByName("192.168.0.1"))
- val superUserName = "superuser1"
- var config: KafkaConfig = _
- var resource: ResourcePattern = _
-
- @Test
- def testAuthorizeByResourceTypeMultipleAddAndRemove(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
- val host1 = InetAddress.getByName("192.168.1.1")
- val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
- val denyRead = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, DENY)
- val allowRead = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, ALLOW)
- val u1h1Context = newRequestContext(user1, host1)
-
- for (_ <- 1 to 10) {
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 should not have READ access to any topic when no ACL
exists")
-
- addAcls(authorizer, Set(allowRead), resource1)
- assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 now should have READ access to at least one topic")
-
- for (_ <- 1 to 10) {
- addAcls(authorizer, Set(denyRead), resource1)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 now should not have READ access to any topic")
-
- removeAcls(authorizer, Set(denyRead), resource1)
- addAcls(authorizer, Set(allowRead), resource1)
- assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 now should have READ access to at least one topic")
- }
-
- removeAcls(authorizer, Set(allowRead), resource1)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 now should not have READ access to any topic")
- }
- }
-
- @Test
- def testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow():
Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2")
- val host1 = InetAddress.getByName("192.168.1.1")
- val host2 = InetAddress.getByName("192.168.1.2")
- val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
- val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(),
LITERAL)
- val resource3 = new ResourcePattern(GROUP, "s", PREFIXED)
-
- val acl1 = new AccessControlEntry(user1.toString, host1.getHostAddress,
READ, DENY)
- val acl2 = new AccessControlEntry(user2.toString, host1.getHostAddress,
READ, DENY)
- val acl3 = new AccessControlEntry(user1.toString, host2.getHostAddress,
WRITE, DENY)
- val acl4 = new AccessControlEntry(user1.toString, host2.getHostAddress,
READ, DENY)
- val acl5 = new AccessControlEntry(user1.toString, host2.getHostAddress,
READ, DENY)
- val acl6 = new AccessControlEntry(user2.toString, host2.getHostAddress,
READ, DENY)
- val acl7 = new AccessControlEntry(user1.toString, host2.getHostAddress,
READ, ALLOW)
-
- addAcls(authorizer, Set(acl1, acl2, acl3, acl6, acl7), resource1)
- addAcls(authorizer, Set(acl4), resource2)
- addAcls(authorizer, Set(acl5), resource3)
-
- val u1h1Context = newRequestContext(user1, host1)
- val u1h2Context = newRequestContext(user1, host2)
-
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 should not have READ access to any topic")
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP),
- "User1 from host2 should not have READ access to any consumer group")
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TRANSACTIONAL_ID),
- "User1 from host2 should not have READ access to any topic")
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.CLUSTER),
- "User1 from host2 should not have READ access to any topic")
- assertTrue(authorizeByResourceType(authorizer, u1h2Context, READ,
ResourceType.TOPIC),
- "User1 from host2 should have READ access to at least one topic")
- }
-
- @Test
- def testAuthorizeByResourceTypeDenyTakesPrecedence(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
- val host1 = InetAddress.getByName("192.168.1.1")
- val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
-
- val u1h1Context = newRequestContext(user1, host1)
- val acl1 = new AccessControlEntry(user1.toString, host1.getHostAddress,
WRITE, ALLOW)
- val acl2 = new AccessControlEntry(user1.toString, host1.getHostAddress,
WRITE, DENY)
-
- addAcls(authorizer, Set(acl1), resource1)
- assertTrue(authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.TOPIC),
- "User1 from host1 should have WRITE access to at least one topic")
-
- addAcls(authorizer, Set(acl2), resource1)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.TOPIC),
- "User1 from host1 should not have WRITE access to any topic")
- }
-
- @Test
- def testAuthorizeByResourceTypePrefixedResourceDenyDominate(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
- val host1 = InetAddress.getByName("192.168.1.1")
- val a = new ResourcePattern(GROUP, "a", PREFIXED)
- val ab = new ResourcePattern(GROUP, "ab", PREFIXED)
- val abc = new ResourcePattern(GROUP, "abc", PREFIXED)
- val abcd = new ResourcePattern(GROUP, "abcd", PREFIXED)
- val abcde = new ResourcePattern(GROUP, "abcde", PREFIXED)
-
- val u1h1Context = newRequestContext(user1, host1)
- val allowAce = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, ALLOW)
- val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress,
READ, DENY)
-
- addAcls(authorizer, Set(allowAce), abcde)
- assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP),
- "User1 from host1 should have READ access to at least one group")
-
- addAcls(authorizer, Set(denyAce), abcd)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP),
- "User1 from host1 now should not have READ access to any group")
-
- addAcls(authorizer, Set(allowAce), abc)
- assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP),
- "User1 from host1 now should have READ access to any group")
-
- addAcls(authorizer, Set(denyAce), a)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP),
- "User1 from host1 now should not have READ access to any group")
-
- addAcls(authorizer, Set(allowAce), ab)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP),
- "User1 from host1 still should not have READ access to any group")
- }
-
- @Test
- def testAuthorizeByResourceTypeWildcardResourceDenyDominate(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
- val host1 = InetAddress.getByName("192.168.1.1")
- val wildcard = new ResourcePattern(GROUP,
ResourcePattern.WILDCARD_RESOURCE, LITERAL)
- val prefixed = new ResourcePattern(GROUP, "hello", PREFIXED)
- val literal = new ResourcePattern(GROUP, "aloha", LITERAL)
-
- val u1h1Context = newRequestContext(user1, host1)
- val allowAce = new AccessControlEntry(user1.toString,
host1.getHostAddress, WRITE, ALLOW)
- val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress,
WRITE, DENY)
-
- addAcls(authorizer, Set(allowAce), prefixed)
- assertTrue(authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.GROUP),
- "User1 from host1 should have WRITE access to at least one group")
-
- addAcls(authorizer, Set(denyAce), wildcard)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.GROUP),
- "User1 from host1 now should not have WRITE access to any group")
-
- addAcls(authorizer, Set(allowAce), wildcard)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.GROUP),
- "User1 from host1 still should not have WRITE access to any group")
-
- addAcls(authorizer, Set(allowAce), literal)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.GROUP),
- "User1 from host1 still should not have WRITE access to any group")
- }
-
- @Test
- def testAuthorizeByResourceTypeWithAllOperationAce(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
- val host1 = InetAddress.getByName("192.168.1.1")
- val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
- val denyAll = new AccessControlEntry(user1.toString, host1.getHostAddress,
ALL, DENY)
- val allowAll = new AccessControlEntry(user1.toString,
host1.getHostAddress, ALL, ALLOW)
- val denyWrite = new AccessControlEntry(user1.toString,
host1.getHostAddress, WRITE, DENY)
- val u1h1Context = newRequestContext(user1, host1)
-
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 should not have READ access to any topic when no ACL
exists")
-
- addAcls(authorizer, Set(denyWrite, allowAll), resource1)
- assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 now should have READ access to at least one topic")
-
- addAcls(authorizer, Set(denyAll), resource1)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 now should not have READ access to any topic")
- }
-
- @Test
- def testAuthorizeByResourceTypeWithAllHostAce(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
- val host1 = InetAddress.getByName("192.168.1.1")
- val host2 = InetAddress.getByName("192.168.1.2")
- val allHost = AclEntry.WILDCARD_HOST
- val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
- val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(),
LITERAL)
- val allowHost1 = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, ALLOW)
- val denyHost1 = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, DENY)
- val denyAllHost = new AccessControlEntry(user1.toString, allHost, READ,
DENY)
- val allowAllHost = new AccessControlEntry(user1.toString, allHost, READ,
ALLOW)
- val u1h1Context = newRequestContext(user1, host1)
- val u1h2Context = newRequestContext(user1, host2)
-
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 should not have READ access to any topic when no ACL
exists")
-
- addAcls(authorizer, Set(allowHost1), resource1)
- assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 should now have READ access to at least one topic")
-
- addAcls(authorizer, Set(denyAllHost), resource1)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 now shouldn't have READ access to any topic")
-
- addAcls(authorizer, Set(denyHost1), resource2)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 still should not have READ access to any topic")
- assertFalse(authorizeByResourceType(authorizer, u1h2Context, READ,
ResourceType.TOPIC),
- "User1 from host2 should not have READ access to any topic")
-
- addAcls(authorizer, Set(allowAllHost), resource2)
- assertTrue(authorizeByResourceType(authorizer, u1h2Context, READ,
ResourceType.TOPIC),
- "User1 from host2 should now have READ access to at least one topic")
-
- addAcls(authorizer, Set(denyAllHost), resource2)
- assertFalse(authorizeByResourceType(authorizer, u1h2Context, READ,
ResourceType.TOPIC),
- "User1 from host2 now shouldn't have READ access to any topic")
- }
-
- @Test
- def testAuthorizeByResourceTypeWithAllPrincipalAce(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2")
- val allUser = AclEntry.WILDCARD_PRINCIPAL_STRING
- val host1 = InetAddress.getByName("192.168.1.1")
- val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
- val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(),
LITERAL)
- val allowUser1 = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, ALLOW)
- val denyUser1 = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, DENY)
- val denyAllUser = new AccessControlEntry(allUser, host1.getHostAddress,
READ, DENY)
- val allowAllUser = new AccessControlEntry(allUser, host1.getHostAddress,
READ, ALLOW)
- val u1h1Context = newRequestContext(user1, host1)
- val u2h1Context = newRequestContext(user2, host1)
-
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 should not have READ access to any topic when no ACL
exists")
-
- addAcls(authorizer, Set(allowUser1), resource1)
- assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 should now have READ access to at least one topic")
-
- addAcls(authorizer, Set(denyAllUser), resource1)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 now shouldn't have READ access to any topic")
-
- addAcls(authorizer, Set(denyUser1), resource2)
- assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC),
- "User1 from host1 still should not have READ access to any topic")
- assertFalse(authorizeByResourceType(authorizer, u2h1Context, READ,
ResourceType.TOPIC),
- "User2 from host1 should not have READ access to any topic")
-
- addAcls(authorizer, Set(allowAllUser), resource2)
- assertTrue(authorizeByResourceType(authorizer, u2h1Context, READ,
ResourceType.TOPIC),
- "User2 from host1 should now have READ access to at least one topic")
-
- addAcls(authorizer, Set(denyAllUser), resource2)
- assertFalse(authorizeByResourceType(authorizer, u2h1Context, READ,
ResourceType.TOPIC),
- "User2 from host1 now shouldn't have READ access to any topic")
- }
-
- @Test
- def testAuthorizeByResourceTypeSuperUserHasAccess(): Unit = {
- val denyAllAce = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING,
WILDCARD_HOST, AclOperation.ALL, DENY)
- val superUser1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
superUserName)
- val host1 = InetAddress.getByName("192.0.4.4")
- val allTopicsResource = new ResourcePattern(TOPIC, WILDCARD_RESOURCE,
LITERAL)
- val clusterResource = new ResourcePattern(CLUSTER, WILDCARD_RESOURCE,
LITERAL)
- val groupResource = new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL)
- val transactionIdResource = new ResourcePattern(TRANSACTIONAL_ID,
WILDCARD_RESOURCE, LITERAL)
-
- addAcls(authorizer, Set(denyAllAce), allTopicsResource)
- addAcls(authorizer, Set(denyAllAce), clusterResource)
- addAcls(authorizer, Set(denyAllAce), groupResource)
- addAcls(authorizer, Set(denyAllAce), transactionIdResource)
-
- val superUserContext = newRequestContext(superUser1, host1)
-
- assertTrue(authorizeByResourceType(authorizer, superUserContext, READ,
ResourceType.TOPIC),
- "superuser always has access, no matter what acls.")
- assertTrue(authorizeByResourceType(authorizer, superUserContext, READ,
ResourceType.CLUSTER),
- "superuser always has access, no matter what acls.")
- assertTrue(authorizeByResourceType(authorizer, superUserContext, READ,
ResourceType.GROUP),
- "superuser always has access, no matter what acls.")
- assertTrue(authorizeByResourceType(authorizer, superUserContext, READ,
ResourceType.TRANSACTIONAL_ID),
- "superuser always has access, no matter what acls.")
- }
-
- def newRequestContext(principal: KafkaPrincipal, clientAddress: InetAddress,
apiKey: ApiKeys = ApiKeys.PRODUCE): RequestContext = {
- val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
- val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short
version, String clientId, int correlation
- new RequestContext(header, "", clientAddress, principal,
ListenerName.forSecurityProtocol(securityProtocol),
- securityProtocol, ClientInformation.EMPTY, false)
- }
-
- def authorizeByResourceType(authorizer: Authorizer, requestContext:
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean
= {
- authorizer.authorizeByResourceType(requestContext, operation,
resourceType) == AuthorizationResult.ALLOWED
- }
-
- def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry],
resourcePattern: ResourcePattern): Unit = {
- val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) }
- authorizer.createAcls(requestContext, bindings.toList.asJava).asScala
- .map(_.toCompletableFuture.get)
- .foreach { result => result.exception.ifPresent { e => throw e } }
- }
-
- def removeAcls(authorizer: Authorizer, aces: Set[AccessControlEntry],
resourcePattern: ResourcePattern): Boolean = {
- val bindings = if (aces.isEmpty)
- Set(new AclBindingFilter(resourcePattern.toFilter,
AccessControlEntryFilter.ANY) )
- else
- aces.map { ace => new AclBinding(resourcePattern, ace).toFilter }
- authorizer.deleteAcls(requestContext, bindings.toList.asJava).asScala
- .map(_.toCompletableFuture.get)
- .forall { result =>
- result.exception.ifPresent { e => throw e }
- result.aclBindingDeleteResults.forEach { r =>
- r.exception.ifPresent { e => throw e }
- }
- !result.aclBindingDeleteResults.isEmpty
- }
- }
-
-}
diff --git
a/server-common/src/testFixtures/java/org/apache/kafka/server/util/ServerTestUtils.java
b/server-common/src/testFixtures/java/org/apache/kafka/server/util/ServerTestUtils.java
index f3898848768..8d1bad658e6 100644
---
a/server-common/src/testFixtures/java/org/apache/kafka/server/util/ServerTestUtils.java
+++
b/server-common/src/testFixtures/java/org/apache/kafka/server/util/ServerTestUtils.java
@@ -17,10 +17,21 @@
package org.apache.kafka.server.util;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.test.TestUtils;
import com.yammer.metrics.core.Gauge;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
public final class ServerTestUtils {
/**
@@ -46,4 +57,28 @@ public final class ServerTestUtils {
return (Number) gauge.value();
}
+ /**
+ * Wait until the ACLs for the given resource match the expected set.
+ *
+ * @param expected the expected set of access control
entries.
+ * @param authorizer the authorizer to query.
+ * @param resource the resource pattern to filter ACLs on.
+ * @param accessControlEntryFilter additional filter for the access
control entries.
+ */
+ public static void waitAndVerifyAcls(Set<AccessControlEntry> expected,
+ Authorizer authorizer,
+ ResourcePattern resource,
+ AccessControlEntryFilter
accessControlEntryFilter) throws InterruptedException {
+ String newLine = System.lineSeparator();
+ String delimiter = newLine + "\t";
+ AclBindingFilter filter = new AclBindingFilter(resource.toFilter(),
accessControlEntryFilter);
+ TestUtils.waitForCondition(() ->
StreamSupport.stream(authorizer.acls(filter).spliterator(), false)
+ .map(AclBinding::entry)
+ .collect(Collectors.toSet()).equals(expected),
+ 45000,
+ () -> "expected acls:" +
expected.stream().map(Object::toString).collect(Collectors.joining(delimiter,
delimiter, newLine))
+ + "but got:" +
StreamSupport.stream(authorizer.acls(filter).spliterator(), false)
+ .map(b ->
b.entry().toString()).collect(Collectors.joining(delimiter, delimiter,
newLine)));
+ }
+
}
diff --git
a/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java
b/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java
new file mode 100644
index 00000000000..926d185548d
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java
@@ -0,0 +1,1069 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.internals.SecurityUtils;
+import org.apache.kafka.controller.MockAclMutator;
+import org.apache.kafka.metadata.authorizer.AuthorizerTestServerInfo;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.raft.KRaftConfigs;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.util.ServerTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.kafka.common.acl.AclOperation.ALL;
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.TWO_PHASE_COMMIT;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.acl.AclPermissionType.DENY;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.MATCH;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME;
+import static
org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE;
+import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
+import static org.apache.kafka.common.resource.ResourceType.GROUP;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
+import static org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST;
+import static
org.apache.kafka.security.authorizer.AclEntry.WILDCARD_PRINCIPAL_STRING;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AuthorizerTest {
+
+ private final Endpoint plaintext = new Endpoint("PLAINTEXT",
SecurityProtocol.PLAINTEXT, "127.0.0.1", 9020);
+
+ private final AccessControlEntry allowReadAcl = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, READ, ALLOW);
+ private final AccessControlEntry allowWriteAcl = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW);
+ private final AccessControlEntry denyReadAcl = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, READ, DENY);
+
+ private final ResourcePattern wildCardResource = new
ResourcePattern(TOPIC, WILDCARD_RESOURCE, LITERAL);
+ private final ResourcePattern prefixedResource = new
ResourcePattern(TOPIC, "foo", PREFIXED);
+ private final ResourcePattern clusterResource = new
ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL);
+ private final KafkaPrincipal wildcardPrincipal =
SecurityUtils.parseKafkaPrincipal(WILDCARD_PRINCIPAL_STRING);
+
+ private final String username = "alice";
+ private final KafkaPrincipal principal = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, username);
+ private final List<Metrics> metricsInstances = new ArrayList<>();
+ private final List<PluginMetricsImpl> pluginMetricsInstances = new
ArrayList<>();
+
+ private Authorizer authorizer;
+ private RequestContext requestContext;
+ private ResourcePattern resource;
+
+ static class CustomPrincipal extends KafkaPrincipal {
+
+ public CustomPrincipal(String principalType, String name) {
+ super(principalType, name);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+ }
+
+ @BeforeEach
+ public void setup() throws Exception {
+ requestContext = newRequestContext(principal,
InetAddress.getByName("192.168.0.1"));
+ authorizer = createAuthorizer(configs());
+ resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(),
LITERAL);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ authorizer.close();
+ for (PluginMetricsImpl pluginMetric : pluginMetricsInstances) {
+ pluginMetric.close();
+ }
+ for (Metrics metrics : metricsInstances) {
+ metrics.close();
+ }
+ }
+
+ private Map<String, Object> configs() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(StandardAuthorizer.SUPER_USERS_CONFIG, "User:superuser1;
User:superuser2");
+ configs.put(KRaftConfigs.NODE_ID_CONFIG, "0");
+ return configs;
+ }
+
+ @Test
+ public void testAuthorizeByResourceTypeMultipleAddAndRemove() throws
Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"user1");
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+ ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" +
UUID.randomUUID(), LITERAL);
+ AccessControlEntry denyRead = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), READ, DENY);
+ AccessControlEntry allowRead = new
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+ RequestContext u1h1Context = newRequestContext(user1, host1);
+
+ for (int i = 0; i < 10; i++) {
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 should not have READ access to any topic
when no ACL exists");
+
+ addAcls(authorizer, Set.of(allowRead), resource1);
+ assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 now should have READ access to at least
one topic");
+
+ for (int j = 0; j < 10; j++) {
+ addAcls(authorizer, Set.of(denyRead), resource1);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context,
READ, TOPIC),
+ "User1 from host1 now should not have READ access to
any topic");
+
+ removeAcls(authorizer, Set.of(denyRead), resource1);
+ addAcls(authorizer, Set.of(allowRead), resource1);
+ assertTrue(authorizeByResourceType(authorizer, u1h1Context,
READ, TOPIC),
+ "User1 from host1 now should have READ access to at
least one topic");
+ }
+
+ removeAcls(authorizer, Set.of(allowRead), resource1);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 now should not have READ access to any
topic");
+ }
+ }
+
+ @Test
+ public void
testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow() throws
Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"user1");
+ KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"user2");
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+ InetAddress host2 = InetAddress.getByName("192.168.1.2");
+ ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" +
UUID.randomUUID(), LITERAL);
+ ResourcePattern resource2 = new ResourcePattern(TOPIC, "sb2" +
UUID.randomUUID(), LITERAL);
+ ResourcePattern resource3 = new ResourcePattern(GROUP, "s", PREFIXED);
+
+ AccessControlEntry acl1 = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), READ, DENY);
+ AccessControlEntry acl2 = new AccessControlEntry(user2.toString(),
host1.getHostAddress(), READ, DENY);
+ AccessControlEntry acl3 = new AccessControlEntry(user1.toString(),
host2.getHostAddress(), WRITE, DENY);
+ AccessControlEntry acl4 = new AccessControlEntry(user1.toString(),
host2.getHostAddress(), READ, DENY);
+ AccessControlEntry acl5 = new AccessControlEntry(user1.toString(),
host2.getHostAddress(), READ, DENY);
+ AccessControlEntry acl6 = new AccessControlEntry(user2.toString(),
host2.getHostAddress(), READ, DENY);
+ AccessControlEntry acl7 = new AccessControlEntry(user1.toString(),
host2.getHostAddress(), READ, ALLOW);
+
+ addAcls(authorizer, Set.of(acl1, acl2, acl3, acl6, acl7), resource1);
+ addAcls(authorizer, Set.of(acl4), resource2);
+ addAcls(authorizer, Set.of(acl5), resource3);
+
+ RequestContext u1h1Context = newRequestContext(user1, host1);
+ RequestContext u1h2Context = newRequestContext(user1, host2);
+
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 should not have READ access to any topic");
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
GROUP),
+ "User1 from host1 should not have READ access to any consumer
group");
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TRANSACTIONAL_ID),
+ "User1 from host1 should not have READ access to any
transactional id");
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
CLUSTER),
+ "User1 from host1 should not have READ access to the cluster");
+ assertTrue(authorizeByResourceType(authorizer, u1h2Context, READ,
TOPIC),
+ "User1 from host2 should have READ access to at least one
topic");
+ }
+
+ @Test
+ public void testAuthorizeByResourceTypeDenyTakesPrecedence() throws
Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"user1");
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+ ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" +
UUID.randomUUID(), LITERAL);
+
+ RequestContext u1h1Context = newRequestContext(user1, host1);
+ AccessControlEntry acl1 = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), WRITE, ALLOW);
+ AccessControlEntry acl2 = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), WRITE, DENY);
+
+ addAcls(authorizer, Set.of(acl1), resource1);
+ assertTrue(authorizeByResourceType(authorizer, u1h1Context, WRITE,
TOPIC),
+ "User1 from host1 should have WRITE access to at least one
topic");
+
+ addAcls(authorizer, Set.of(acl2), resource1);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE,
TOPIC),
+ "User1 from host1 should not have WRITE access to any topic");
+ }
+
+ @Test
+ public void testAuthorizeByResourceTypePrefixedResourceDenyDominate()
throws Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"user1");
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+ ResourcePattern a = new ResourcePattern(GROUP, "a", PREFIXED);
+ ResourcePattern ab = new ResourcePattern(GROUP, "ab", PREFIXED);
+ ResourcePattern abc = new ResourcePattern(GROUP, "abc", PREFIXED);
+ ResourcePattern abcd = new ResourcePattern(GROUP, "abcd", PREFIXED);
+ ResourcePattern abcde = new ResourcePattern(GROUP, "abcde", PREFIXED);
+
+ RequestContext u1h1Context = newRequestContext(user1, host1);
+ AccessControlEntry allowAce = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), READ, ALLOW);
+ AccessControlEntry denyAce = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), READ, DENY);
+
+ addAcls(authorizer, Set.of(allowAce), abcde);
+ assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
GROUP),
+ "User1 from host1 should have READ access to at least one
group");
+
+ addAcls(authorizer, Set.of(denyAce), abcd);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
GROUP),
+ "User1 from host1 now should not have READ access to any
group");
+
+ addAcls(authorizer, Set.of(allowAce), abc);
+ assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
GROUP),
+ "User1 from host1 now should have READ access to any group");
+
+ addAcls(authorizer, Set.of(denyAce), a);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
GROUP),
+ "User1 from host1 now should not have READ access to any
group");
+
+ addAcls(authorizer, Set.of(allowAce), ab);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
GROUP),
+ "User1 from host1 still should not have READ access to any
group");
+ }
+
+ @Test
+ public void testAuthorizeByResourceTypeWildcardResourceDenyDominate()
throws Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"user1");
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+ ResourcePattern wildcard = new ResourcePattern(GROUP,
WILDCARD_RESOURCE, LITERAL);
+ ResourcePattern prefixed = new ResourcePattern(GROUP, "hello",
PREFIXED);
+ ResourcePattern literal = new ResourcePattern(GROUP, "aloha", LITERAL);
+
+ RequestContext u1h1Context = newRequestContext(user1, host1);
+ AccessControlEntry allowAce = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), WRITE, ALLOW);
+ AccessControlEntry denyAce = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), WRITE, DENY);
+
+ addAcls(authorizer, Set.of(allowAce), prefixed);
+ assertTrue(authorizeByResourceType(authorizer, u1h1Context, WRITE,
GROUP),
+ "User1 from host1 should have WRITE access to at least one
group");
+
+ addAcls(authorizer, Set.of(denyAce), wildcard);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE,
GROUP),
+ "User1 from host1 now should not have WRITE access to any
group");
+
+ addAcls(authorizer, Set.of(allowAce), wildcard);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE,
GROUP),
+ "User1 from host1 still should not have WRITE access to any
group");
+
+ addAcls(authorizer, Set.of(allowAce), literal);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE,
GROUP),
+ "User1 from host1 still should not have WRITE access to any
group");
+ }
+
+ @Test
+ public void testAuthorizeByResourceTypeWithAllOperationAce() throws
Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"user1");
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+ ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" +
UUID.randomUUID(), LITERAL);
+ AccessControlEntry denyAll = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), ALL, DENY);
+ AccessControlEntry allowAll = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), ALL, ALLOW);
+ AccessControlEntry denyWrite = new
AccessControlEntry(user1.toString(), host1.getHostAddress(), WRITE, DENY);
+ RequestContext u1h1Context = newRequestContext(user1, host1);
+
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 should not have READ access to any topic
when no ACL exists");
+
+ addAcls(authorizer, Set.of(denyWrite, allowAll), resource1);
+ assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 now should have READ access to at least one
topic");
+
+ addAcls(authorizer, Set.of(denyAll), resource1);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 now should not have READ access to any
topic");
+ }
+
+ @Test
+ public void testAuthorizeByResourceTypeWithAllHostAce() throws Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"user1");
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+ InetAddress host2 = InetAddress.getByName("192.168.1.2");
+ String allHost = WILDCARD_HOST;
+ ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" +
UUID.randomUUID(), LITERAL);
+ ResourcePattern resource2 = new ResourcePattern(TOPIC, "sb2" +
UUID.randomUUID(), LITERAL);
+ AccessControlEntry allowHost1 = new
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+ AccessControlEntry denyHost1 = new
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, DENY);
+ AccessControlEntry denyAllHost = new
AccessControlEntry(user1.toString(), allHost, READ, DENY);
+ AccessControlEntry allowAllHost = new
AccessControlEntry(user1.toString(), allHost, READ, ALLOW);
+ RequestContext u1h1Context = newRequestContext(user1, host1);
+ RequestContext u1h2Context = newRequestContext(user1, host2);
+
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 should not have READ access to any topic
when no ACL exists");
+
+ addAcls(authorizer, Set.of(allowHost1), resource1);
+ assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 should now have READ access to at least one
topic");
+
+ addAcls(authorizer, Set.of(denyAllHost), resource1);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 now shouldn't have READ access to any
topic");
+
+ addAcls(authorizer, Set.of(denyHost1), resource2);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 still should not have READ access to any
topic");
+ assertFalse(authorizeByResourceType(authorizer, u1h2Context, READ,
TOPIC),
+ "User1 from host2 should not have READ access to any topic");
+
+ addAcls(authorizer, Set.of(allowAllHost), resource2);
+ assertTrue(authorizeByResourceType(authorizer, u1h2Context, READ,
TOPIC),
+ "User1 from host2 should now have READ access to at least one
topic");
+
+ addAcls(authorizer, Set.of(denyAllHost), resource2);
+ assertFalse(authorizeByResourceType(authorizer, u1h2Context, READ,
TOPIC),
+ "User1 from host2 now shouldn't have READ access to any
topic");
+ }
+
+ @Test
+ public void testAuthorizeByResourceTypeWithAllPrincipalAce() throws
Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"user1");
+ KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"user2");
+ String allUser = WILDCARD_PRINCIPAL_STRING;
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+ ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" +
UUID.randomUUID(), LITERAL);
+ ResourcePattern resource2 = new ResourcePattern(TOPIC, "sb2" +
UUID.randomUUID(), LITERAL);
+ AccessControlEntry allowUser1 = new
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+ AccessControlEntry denyUser1 = new
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, DENY);
+ AccessControlEntry denyAllUser = new AccessControlEntry(allUser,
host1.getHostAddress(), READ, DENY);
+ AccessControlEntry allowAllUser = new AccessControlEntry(allUser,
host1.getHostAddress(), READ, ALLOW);
+ RequestContext u1h1Context = newRequestContext(user1, host1);
+ RequestContext u2h1Context = newRequestContext(user2, host1);
+
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 should not have READ access to any topic
when no ACL exists");
+
+ addAcls(authorizer, Set.of(allowUser1), resource1);
+ assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 should now have READ access to at least one
topic");
+
+ addAcls(authorizer, Set.of(denyAllUser), resource1);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 now shouldn't have READ access to any
topic");
+
+ addAcls(authorizer, Set.of(denyUser1), resource2);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ,
TOPIC),
+ "User1 from host1 still should not have READ access to any
topic");
+ assertFalse(authorizeByResourceType(authorizer, u2h1Context, READ,
TOPIC),
+ "User2 from host1 should not have READ access to any topic");
+
+ addAcls(authorizer, Set.of(allowAllUser), resource2);
+ assertTrue(authorizeByResourceType(authorizer, u2h1Context, READ,
TOPIC),
+ "User2 from host1 should now have READ access to at least one
topic");
+
+ addAcls(authorizer, Set.of(denyAllUser), resource2);
+ assertFalse(authorizeByResourceType(authorizer, u2h1Context, READ,
TOPIC),
+ "User2 from host1 now shouldn't have READ access to any
topic");
+ }
+
+ @Test
+ public void testAuthorizeByResourceTypeSuperUserHasAccess() throws
Exception {
+ AccessControlEntry denyAllAce = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL,
DENY);
+ String superUserName = "superuser1";
+ KafkaPrincipal superUser1 = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, superUserName);
+ InetAddress host1 = InetAddress.getByName("192.0.4.4");
+ ResourcePattern allTopicsResource = new ResourcePattern(TOPIC,
WILDCARD_RESOURCE, LITERAL);
+ ResourcePattern clusterResource = new ResourcePattern(CLUSTER,
WILDCARD_RESOURCE, LITERAL);
+ ResourcePattern groupResource = new ResourcePattern(GROUP,
WILDCARD_RESOURCE, LITERAL);
+ ResourcePattern transactionIdResource = new
ResourcePattern(TRANSACTIONAL_ID, WILDCARD_RESOURCE, LITERAL);
+
+ addAcls(authorizer, Set.of(denyAllAce), allTopicsResource);
+ addAcls(authorizer, Set.of(denyAllAce), clusterResource);
+ addAcls(authorizer, Set.of(denyAllAce), groupResource);
+ addAcls(authorizer, Set.of(denyAllAce), transactionIdResource);
+
+ RequestContext superUserContext = newRequestContext(superUser1, host1);
+
+ assertTrue(authorizeByResourceType(authorizer, superUserContext, READ,
TOPIC),
+ "superuser always has access, no matter what acls.");
+ assertTrue(authorizeByResourceType(authorizer, superUserContext, READ,
CLUSTER),
+ "superuser always has access, no matter what acls.");
+ assertTrue(authorizeByResourceType(authorizer, superUserContext, READ,
GROUP),
+ "superuser always has access, no matter what acls.");
+ assertTrue(authorizeByResourceType(authorizer, superUserContext, READ,
TRANSACTIONAL_ID),
+ "superuser always has access, no matter what acls.");
+ }
+
+ @Test
+ public void testAuthorizeThrowsOnNonLiteralResource() {
+ assertThrows(IllegalArgumentException.class, () ->
authorize(authorizer, requestContext, READ,
+ new ResourcePattern(TOPIC, "something", PREFIXED)));
+ }
+
+ @Test
+ public void testAuthorizeWithEmptyResourceName() throws Exception {
+ assertFalse(authorize(authorizer, requestContext, READ, new
ResourcePattern(GROUP, "", LITERAL)));
+ addAcls(authorizer, Set.of(allowReadAcl), new ResourcePattern(GROUP,
WILDCARD_RESOURCE, LITERAL));
+ assertTrue(authorize(authorizer, requestContext, READ, new
ResourcePattern(GROUP, "", LITERAL)));
+ }
+
+ // Authorizing the empty resource is not supported because empty resource
name is invalid.
+ @Test
+ public void testEmptyAclThrowsException() {
+ assertThrows(ApiException.class,
+ () -> addAcls(authorizer, Set.of(allowReadAcl), new
ResourcePattern(GROUP, "", LITERAL)));
+ }
+
+ @Test
+ public void testTopicAcl() throws Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
username);
+ KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"rob");
+ KafkaPrincipal user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"batman");
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+ InetAddress host2 = InetAddress.getByName("192.168.1.2");
+
+ //user1 has READ access from host1 and host2.
+ AccessControlEntry acl1 = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), READ, ALLOW);
+ AccessControlEntry acl2 = new AccessControlEntry(user1.toString(),
host2.getHostAddress(), READ, ALLOW);
+
+ //user1 does not have READ access from host1.
+ AccessControlEntry acl3 = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), READ, DENY);
+
+ //user1 has WRITE access from host1 only.
+ AccessControlEntry acl4 = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), WRITE, ALLOW);
+
+ //user1 has DESCRIBE access from all hosts.
+ AccessControlEntry acl5 = new AccessControlEntry(user1.toString(),
WILDCARD_HOST, DESCRIBE, ALLOW);
+
+ //user2 has READ access from all hosts.
+ AccessControlEntry acl6 = new AccessControlEntry(user2.toString(),
WILDCARD_HOST, READ, ALLOW);
+
+ //user3 has WRITE access from all hosts.
+ AccessControlEntry acl7 = new AccessControlEntry(user3.toString(),
WILDCARD_HOST, WRITE, ALLOW);
+
+ Set<AccessControlEntry> acls = Set.of(acl1, acl2, acl3, acl4, acl5,
acl6, acl7);
+
+ changeAclAndVerify(Set.of(), acls, Set.of());
+
+ RequestContext host1Context = newRequestContext(user1, host1);
+ RequestContext host2Context = newRequestContext(user1, host2);
+
+ assertTrue(authorize(authorizer, host2Context, READ, resource), "User1
should have READ access from host2");
+ assertFalse(authorize(authorizer, host1Context, READ, resource),
"User1 should not have READ access from host1 due to denyAcl");
+ assertTrue(authorize(authorizer, host1Context, WRITE, resource),
"User1 should have WRITE access from host1");
+ assertFalse(authorize(authorizer, host2Context, WRITE, resource),
"User1 should not have WRITE access from host2 as no allow acl is defined");
+ assertTrue(authorize(authorizer, host1Context, DESCRIBE, resource),
"User1 should have DESCRIBE access from host1");
+ assertTrue(authorize(authorizer, host2Context, DESCRIBE, resource),
"User1 should have DESCRIBE access from host2");
+ assertFalse(authorize(authorizer, host1Context, ALTER, resource),
"User1 should not have edit access from host1");
+ assertFalse(authorize(authorizer, host2Context, ALTER, resource),
"User1 should not have edit access from host2");
+
+ //test if user has READ or WRITE access they also get DESCRIBE access
+ RequestContext user2Context = newRequestContext(user2, host1);
+ RequestContext user3Context = newRequestContext(user3, host1);
+ assertTrue(authorize(authorizer, user2Context, DESCRIBE, resource),
"User2 should have DESCRIBE access from host1");
+ assertTrue(authorize(authorizer, user3Context, DESCRIBE, resource),
"User3 should have DESCRIBE access from host1");
+ assertTrue(authorize(authorizer, user2Context, READ, resource), "User2
should have READ access from host1");
+ assertTrue(authorize(authorizer, user3Context, WRITE, resource),
"User3 should have WRITE access from host1");
+ }
+
+ /**
+ * CustomPrincipals should be compared with their principal type and name
+ */
+ @Test
+ public void testAllowAccessWithCustomPrincipal() throws Exception {
+ KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
username);
+ CustomPrincipal customUserPrincipal = new
CustomPrincipal(KafkaPrincipal.USER_TYPE, username);
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+ InetAddress host2 = InetAddress.getByName("192.168.1.2");
+
+ // user has READ access from host2 but not from host1
+ AccessControlEntry acl1 = new AccessControlEntry(user.toString(),
host1.getHostAddress(), READ, DENY);
+ AccessControlEntry acl2 = new AccessControlEntry(user.toString(),
host2.getHostAddress(), READ, ALLOW);
+ Set<AccessControlEntry> acls = Set.of(acl1, acl2);
+ changeAclAndVerify(Set.of(), acls, Set.of());
+
+ RequestContext host1Context = newRequestContext(customUserPrincipal,
host1);
+ RequestContext host2Context = newRequestContext(customUserPrincipal,
host2);
+
+ assertTrue(authorize(authorizer, host2Context, READ, resource), "User1
should have READ access from host2");
+ assertFalse(authorize(authorizer, host1Context, READ, resource),
"User1 should not have READ access from host1 due to denyAcl");
+ }
+
+ @Test
+ public void testDenyTakesPrecedence() throws Exception {
+ KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
username);
+ InetAddress host = InetAddress.getByName("192.168.2.1");
+ RequestContext session = newRequestContext(user, host);
+
+ AccessControlEntry allowAll = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL,
ALLOW);
+ AccessControlEntry denyAcl = new AccessControlEntry(user.toString(),
host.getHostAddress(), AclOperation.ALL, DENY);
+ Set<AccessControlEntry> acls = Set.of(allowAll, denyAcl);
+
+ changeAclAndVerify(Set.of(), acls, Set.of());
+
+ assertFalse(authorize(authorizer, session, READ, resource), "deny
should take precedence over allow.");
+ }
+
+ @Test
+ public void testAllowAllAccess() throws Exception {
+ AccessControlEntry allowAllAcl = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL,
ALLOW);
+
+ changeAclAndVerify(Set.of(), Set.of(allowAllAcl), Set.of());
+
+ RequestContext context = newRequestContext(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"),
InetAddress.getByName("192.0.4.4"));
+ assertTrue(authorize(authorizer, context, READ, resource), "allow all
acl should allow access to all.");
+ }
+
+ @Test
+ public void testSuperUserHasAccess() throws Exception {
+ AccessControlEntry denyAllAcl = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL,
DENY);
+
+ changeAclAndVerify(Set.of(), Set.of(denyAllAcl), Set.of());
+
+ RequestContext session1 = newRequestContext(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"),
InetAddress.getByName("192.0.4.4"));
+ RequestContext session2 = newRequestContext(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"),
InetAddress.getByName("192.0.4.4"));
+
+ assertTrue(authorize(authorizer, session1, READ, resource), "superuser
always has access, no matter what acls.");
+ assertTrue(authorize(authorizer, session2, READ, resource), "superuser
always has access, no matter what acls.");
+ }
+
+ /**
+ * CustomPrincipals should be compared with their principal type and name
+ */
+ @Test
+ public void testSuperUserWithCustomPrincipalHasAccess() throws Exception {
+ AccessControlEntry denyAllAcl = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL,
DENY);
+ changeAclAndVerify(Set.of(), Set.of(denyAllAcl), Set.of());
+
+ RequestContext session = newRequestContext(new
CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"),
InetAddress.getByName("192.0.4.4"));
+
+ assertTrue(authorize(authorizer, session, READ, resource), "superuser
with custom principal always has access, no matter what acls.");
+ }
+
+ @Test
+ public void testWildCardAcls() throws Exception {
+ assertFalse(authorize(authorizer, requestContext, READ, resource),
"when acls = [], authorizer should fail close.");
+
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
username);
+ InetAddress host1 = InetAddress.getByName("192.168.3.1");
+ AccessControlEntry readAcl = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), READ, ALLOW);
+
+ Set<AccessControlEntry> acls = changeAclAndVerify(Set.of(),
Set.of(readAcl), Set.of(), wildCardResource);
+
+ RequestContext host1Context = newRequestContext(user1, host1);
+ assertTrue(authorize(authorizer, host1Context, READ, resource), "User1
should have READ access from host1");
+
+ //allow WRITE to specific topic.
+ AccessControlEntry writeAcl = new AccessControlEntry(user1.toString(),
host1.getHostAddress(), WRITE, ALLOW);
+ changeAclAndVerify(Set.of(), Set.of(writeAcl), Set.of());
+
+ //deny WRITE to wild card topic.
+ AccessControlEntry denyWriteOnWildCardResourceAcl = new
AccessControlEntry(user1.toString(), host1.getHostAddress(), WRITE, DENY);
+ changeAclAndVerify(acls, Set.of(denyWriteOnWildCardResourceAcl),
Set.of(), wildCardResource);
+
+ assertFalse(authorize(authorizer, host1Context, WRITE, resource),
"User1 should not have WRITE access from host1");
+ }
+
+ @Test
+ public void testNoAclFound() {
+ assertFalse(authorize(authorizer, requestContext, READ, resource),
"when acls = [], authorizer should deny op.");
+ }
+
+ @Test
+ public void testNoAclFoundOverride() throws IOException {
+ Map<String, Object> cfg = configs();
+ cfg.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG,
"true");
+
+ try (Authorizer testAuthorizer = createAuthorizer(cfg)) {
+ assertTrue(authorize(testAuthorizer, requestContext, READ,
resource),
+ "when acls = null or [], authorizer should allow op with
allow.everyone = true.");
+ }
+ }
+
+ @Test
+ public void testAclConfigWithWhitespace() throws IOException {
+ Map<String, Object> cfg = configs();
+ cfg.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "
true");
+ // replace all property values with leading & trailing whitespaces
+ cfg.replaceAll((k, v) -> " " + v + " ");
+
+ try (Authorizer testAuthorizer = createAuthorizer(cfg)) {
+ assertTrue(authorize(testAuthorizer, requestContext, READ,
resource),
+ "when acls = null or [], authorizer should allow op with
allow.everyone = true.");
+ }
+ }
+
+ @Test
+ public void testAclManagementAPIs() throws Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
username);
+ KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"bob");
+ String host1 = "host1";
+ String host2 = "host2";
+
+ AccessControlEntry acl1 = new AccessControlEntry(user1.toString(),
host1, READ, ALLOW);
+ AccessControlEntry acl2 = new AccessControlEntry(user1.toString(),
host1, WRITE, ALLOW);
+ AccessControlEntry acl3 = new AccessControlEntry(user2.toString(),
host2, READ, ALLOW);
+ AccessControlEntry acl4 = new AccessControlEntry(user2.toString(),
host2, WRITE, ALLOW);
+
+ Set<AccessControlEntry> acls = changeAclAndVerify(Set.of(),
Set.of(acl1, acl2, acl3, acl4), Set.of());
+
+ //test addAcl is additive
+ AccessControlEntry acl5 = new AccessControlEntry(user2.toString(),
WILDCARD_HOST, READ, ALLOW);
+ acls = changeAclAndVerify(acls, Set.of(acl5), Set.of());
+
+ //test get by principal name.
+ TestUtils.waitForCondition(() -> Set.of(acl1, acl2).stream().map(acl
-> new AclBinding(resource,
acl)).collect(Collectors.toSet()).equals(getAcls(authorizer, user1)),
+ "changes not propagated in timeout period");
+ TestUtils.waitForCondition(() -> Set.of(acl3, acl4,
acl5).stream().map(acl -> new AclBinding(resource,
acl)).collect(Collectors.toSet()).equals(getAcls(authorizer, user2)),
+ "changes not propagated in timeout period");
+
+ Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = Map.of(
+ new ResourcePattern(TOPIC, WILDCARD_RESOURCE, LITERAL),
Set.of(new AccessControlEntry(user2.toString(), WILDCARD_HOST, READ, ALLOW)),
+ new ResourcePattern(CLUSTER, WILDCARD_RESOURCE, LITERAL),
Set.of(new AccessControlEntry(user2.toString(), host1, READ, ALLOW)),
+ new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL), acls,
+ new ResourcePattern(GROUP, "test-ConsumerGroup", LITERAL), acls
+ );
+
+ for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry :
resourceToAcls.entrySet()) {
+ ResourcePattern key = entry.getKey();
+ Set<AccessControlEntry> value = entry.getValue();
+ changeAclAndVerify(Set.of(), value, Set.of(), key);
+ }
+
+ Set<AclBinding> expectedAcls = new HashSet<>();
+ resourceToAcls.forEach((res, aces) ->
+ aces.forEach(ace -> expectedAcls.add(new AclBinding(res, ace)))
+ );
+ acls.forEach(acl -> expectedAcls.add(new AclBinding(resource, acl)));
+ TestUtils.waitForCondition(() ->
expectedAcls.equals(getAcls(authorizer)), "changes not propagated in timeout
period.");
+
+ //test remove acl from existing acls.
+ changeAclAndVerify(acls, Set.of(), Set.of(acl1, acl5));
+
+ //test remove all acls for resource
+ removeAcls(authorizer, Set.of(), resource);
+ ServerTestUtils.waitAndVerifyAcls(Set.of(), authorizer, resource,
AccessControlEntryFilter.ANY);
+
+ acls = changeAclAndVerify(Set.of(), Set.of(acl1), Set.of());
+ changeAclAndVerify(acls, Set.of(), acls);
+ }
+
+ @Test
+ public void testLocalConcurrentModificationOfResourceAcls() throws
Exception {
+ ResourcePattern commonResource = new ResourcePattern(TOPIC, "test",
LITERAL);
+
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
username);
+ AccessControlEntry acl1 = new AccessControlEntry(user1.toString(),
WILDCARD_HOST, READ, ALLOW);
+
+ KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"bob");
+ AccessControlEntry acl2 = new AccessControlEntry(user2.toString(),
WILDCARD_HOST, READ, DENY);
+
+ addAcls(authorizer, Set.of(acl1), commonResource);
+ addAcls(authorizer, Set.of(acl2), commonResource);
+
+ ServerTestUtils.waitAndVerifyAcls(Set.of(acl1, acl2), authorizer,
commonResource, AccessControlEntryFilter.ANY);
+ }
+
+ /**
+ * Test ACL inheritance, as described in {@link
org.apache.kafka.common.acl.AclOperation}
+ */
+ @Test
+ public void testAclInheritance() throws Exception {
+ testImplicationsOfAllow(AclOperation.ALL, Set.of(READ, WRITE, CREATE,
DELETE, ALTER, DESCRIBE,
+ CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS,
IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT));
+ testImplicationsOfDeny(AclOperation.ALL, Set.of(READ, WRITE, CREATE,
DELETE, ALTER, DESCRIBE,
+ CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS,
IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT));
+ testImplicationsOfAllow(READ, Set.of(DESCRIBE));
+ testImplicationsOfAllow(WRITE, Set.of(DESCRIBE));
+ testImplicationsOfAllow(DELETE, Set.of(DESCRIBE));
+ testImplicationsOfAllow(ALTER, Set.of(DESCRIBE));
+ testImplicationsOfDeny(DESCRIBE, Set.of());
+ testImplicationsOfAllow(ALTER_CONFIGS, Set.of(DESCRIBE_CONFIGS));
+ testImplicationsOfDeny(DESCRIBE_CONFIGS, Set.of());
+ }
+
+ private void testImplicationsOfAllow(AclOperation parentOp,
Set<AclOperation> allowedOps) throws Exception {
+ KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
username);
+ InetAddress host = InetAddress.getByName("192.168.3.1");
+ RequestContext hostContext = newRequestContext(user, host);
+ AccessControlEntry acl = new AccessControlEntry(user.toString(),
WILDCARD_HOST, parentOp, ALLOW);
+ addAcls(authorizer, Set.of(acl), clusterResource);
+ for (AclOperation op : AclOperation.values()) {
+ if (invalidOp(op)) continue;
+ boolean authorized = authorize(authorizer, hostContext, op,
clusterResource);
+ if (allowedOps.contains(op) || op == parentOp) {
+ assertTrue(authorized, "ALLOW " + parentOp + " should imply
ALLOW " + op);
+ } else {
+ assertFalse(authorized, "ALLOW " + parentOp + " should not
imply ALLOW " + op);
+ }
+ }
+ removeAcls(authorizer, Set.of(acl), clusterResource);
+ }
+
+ private void testImplicationsOfDeny(AclOperation parentOp,
Set<AclOperation> deniedOps) throws Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
username);
+ InetAddress host1 = InetAddress.getByName("192.168.3.1");
+ RequestContext host1Context = newRequestContext(user1, host1);
+ Set<AccessControlEntry> acls = Set.of(new
AccessControlEntry(user1.toString(), WILDCARD_HOST, parentOp, DENY),
+ new AccessControlEntry(user1.toString(), WILDCARD_HOST,
AclOperation.ALL, ALLOW));
+ addAcls(authorizer, acls, clusterResource);
+ for (AclOperation op : AclOperation.values()) {
+ if (invalidOp(op)) continue;
+ boolean authorized = authorize(authorizer, host1Context, op,
clusterResource);
+ if (deniedOps.contains(op) || op == parentOp) {
+ assertFalse(authorized, "DENY " + parentOp + " should imply
DENY " + op);
+ } else {
+ assertTrue(authorized, "DENY " + parentOp + " should not imply
DENY " + op);
+ }
+ }
+ removeAcls(authorizer, acls, clusterResource);
+ }
+
+ @Test
+ public void testAccessAllowedIfAllowAclExistsOnWildcardResource() throws
Exception {
+ addAcls(authorizer, Set.of(allowReadAcl), wildCardResource);
+
+ assertTrue(authorize(authorizer, requestContext, READ, resource));
+ }
+
+ @Test
+ public void testDeleteAclOnWildcardResource() throws Exception {
+ addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl),
wildCardResource);
+
+ removeAcls(authorizer, Set.of(allowReadAcl), wildCardResource);
+
+ assertEquals(Set.of(allowWriteAcl), getAcls(authorizer,
wildCardResource));
+ }
+
+ @Test
+ public void testDeleteAllAclOnWildcardResource() throws Exception {
+ addAcls(authorizer, Set.of(allowReadAcl), wildCardResource);
+
+ removeAcls(authorizer, Set.of(), wildCardResource);
+
+ assertEquals(Set.of(), getAcls(authorizer));
+ }
+
+ @Test
+ public void testAccessAllowedIfAllowAclExistsOnPrefixedResource() throws
Exception {
+ addAcls(authorizer, Set.of(allowReadAcl), prefixedResource);
+
+ assertTrue(authorize(authorizer, requestContext, READ, resource));
+ }
+
+ @Test
+ public void testDeleteAclOnPrefixedResource() throws Exception {
+ addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl),
prefixedResource);
+
+ removeAcls(authorizer, Set.of(allowReadAcl), prefixedResource);
+
+ assertEquals(Set.of(allowWriteAcl), getAcls(authorizer,
prefixedResource));
+ }
+
+ @Test
+ public void testDeleteAllAclOnPrefixedResource() throws Exception {
+ addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl),
prefixedResource);
+
+ removeAcls(authorizer, Set.of(), prefixedResource);
+
+ assertEquals(Set.of(), getAcls(authorizer));
+ }
+
+ @Test
+ public void testAddAclsOnLiteralResource() throws Exception {
+ addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl), resource);
+ addAcls(authorizer, Set.of(allowWriteAcl, denyReadAcl), resource);
+
+ assertEquals(Set.of(allowReadAcl, allowWriteAcl, denyReadAcl),
getAcls(authorizer, resource));
+ assertEquals(Set.of(), getAcls(authorizer, wildCardResource));
+ assertEquals(Set.of(), getAcls(authorizer, prefixedResource));
+ }
+
+ @Test
+ public void testAddAclsOnWildcardResource() throws Exception {
+ addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl),
wildCardResource);
+ addAcls(authorizer, Set.of(allowWriteAcl, denyReadAcl),
wildCardResource);
+
+ assertEquals(Set.of(allowReadAcl, allowWriteAcl, denyReadAcl),
getAcls(authorizer, wildCardResource));
+ assertEquals(Set.of(), getAcls(authorizer, resource));
+ assertEquals(Set.of(), getAcls(authorizer, prefixedResource));
+ }
+
+ @Test
+ public void testAddAclsOnPrefixedResource() throws Exception {
+ addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl),
prefixedResource);
+ addAcls(authorizer, Set.of(allowWriteAcl, denyReadAcl),
prefixedResource);
+
+ assertEquals(Set.of(allowReadAcl, allowWriteAcl, denyReadAcl),
getAcls(authorizer, prefixedResource));
+ assertEquals(Set.of(), getAcls(authorizer, wildCardResource));
+ assertEquals(Set.of(), getAcls(authorizer, resource));
+ }
+
+ @Test
+ public void testAuthorizeWithPrefixedResource() throws Exception {
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"a_other", LITERAL));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"a_other", PREFIXED));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"foo-" + UUID.randomUUID(), PREFIXED));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"foo-" + UUID.randomUUID(), PREFIXED));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"foo-" + UUID.randomUUID() + "-zzz", PREFIXED));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"fooo-" + UUID.randomUUID(), PREFIXED));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"fo-" + UUID.randomUUID(), PREFIXED));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"fop-" + UUID.randomUUID(), PREFIXED));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"fon-" + UUID.randomUUID(), PREFIXED));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"fon-", PREFIXED));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"z_other", PREFIXED));
+ addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC,
"z_other", LITERAL));
+
+ addAcls(authorizer, Set.of(allowReadAcl), prefixedResource);
+
+ assertTrue(authorize(authorizer, requestContext, READ, resource));
+ }
+
+ @Test
+ public void testSingleCharacterResourceAcls() throws Exception {
+ addAcls(authorizer, Set.of(allowReadAcl), new ResourcePattern(TOPIC,
"f", LITERAL));
+ assertTrue(authorize(authorizer, requestContext, READ, new
ResourcePattern(TOPIC, "f", LITERAL)));
+ assertFalse(authorize(authorizer, requestContext, READ, new
ResourcePattern(TOPIC, "foo", LITERAL)));
+
+ addAcls(authorizer, Set.of(allowReadAcl), new ResourcePattern(TOPIC,
"_", PREFIXED));
+ assertTrue(authorize(authorizer, requestContext, READ, new
ResourcePattern(TOPIC, "_foo", LITERAL)));
+ assertTrue(authorize(authorizer, requestContext, READ, new
ResourcePattern(TOPIC, "_", LITERAL)));
+ assertFalse(authorize(authorizer, requestContext, READ, new
ResourcePattern(TOPIC, "foo_", LITERAL)));
+ }
+
+ @Test
+ public void testGetAclsPrincipal() throws Exception {
+ AccessControlEntry aclOnSpecificPrincipal = new
AccessControlEntry(principal.toString(), WILDCARD_HOST, WRITE, ALLOW);
+ addAcls(authorizer, Set.of(aclOnSpecificPrincipal), resource);
+
+ assertEquals(0,
+ getAcls(authorizer, wildcardPrincipal).size(), "acl on
specific should not be returned for wildcard request");
+ assertEquals(1,
+ getAcls(authorizer, principal).size(), "acl on specific should
be returned for specific request");
+ assertEquals(1,
+ getAcls(authorizer, new
KafkaPrincipal(principal.getPrincipalType(), principal.getName())).size(), "acl
on specific should be returned for different principal instance");
+
+ removeAcls(authorizer, Set.of(), resource);
+ AccessControlEntry aclOnWildcardPrincipal = new
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW);
+ addAcls(authorizer, Set.of(aclOnWildcardPrincipal), resource);
+
+ assertEquals(1, getAcls(authorizer, wildcardPrincipal).size(), "acl on
wildcard should be returned for wildcard request");
+ assertEquals(0, getAcls(authorizer, principal).size(), "acl on
wildcard should not be returned for specific request");
+ }
+
+ @Test
+ public void testAclsFilter() throws Exception {
+ ResourcePattern resource1 = new ResourcePattern(TOPIC, "foo-" +
UUID.randomUUID(), LITERAL);
+ ResourcePattern resource2 = new ResourcePattern(TOPIC, "bar-" +
UUID.randomUUID(), LITERAL);
+ ResourcePattern prefixedResource = new ResourcePattern(TOPIC, "bar-",
PREFIXED);
+
+ AclBinding acl1 = new AclBinding(resource1, new
AccessControlEntry(principal.toString(), WILDCARD_HOST, READ, ALLOW));
+ AclBinding acl2 = new AclBinding(resource1, new
AccessControlEntry(principal.toString(), "192.168.0.1", WRITE, ALLOW));
+ AclBinding acl3 = new AclBinding(resource2, new
AccessControlEntry(principal.toString(), WILDCARD_HOST, DESCRIBE, ALLOW));
+ AclBinding acl4 = new AclBinding(prefixedResource, new
AccessControlEntry(wildcardPrincipal.toString(), WILDCARD_HOST, READ, ALLOW));
+
+ authorizer.createAcls(requestContext, List.of(acl1, acl2, acl3, acl4));
+ assertEquals(Set.of(acl1, acl2, acl3, acl4),
toSet(authorizer.acls(AclBindingFilter.ANY)));
+ assertEquals(Set.of(acl1, acl2), toSet(authorizer.acls(new
AclBindingFilter(resource1.toFilter(), AccessControlEntryFilter.ANY))));
+ assertEquals(Set.of(acl4), toSet(authorizer.acls(new
AclBindingFilter(prefixedResource.toFilter(), AccessControlEntryFilter.ANY))));
+ AclBindingFilter matchingFilter = new AclBindingFilter(new
ResourcePatternFilter(ResourceType.ANY, resource2.name(), MATCH),
AccessControlEntryFilter.ANY);
+ assertEquals(Set.of(acl3, acl4),
toSet(authorizer.acls(matchingFilter)));
+
+ List<AclBindingFilter> filters = List.of(matchingFilter,
+ acl1.toFilter(),
+ new AclBindingFilter(resource2.toFilter(),
AccessControlEntryFilter.ANY),
+ new AclBindingFilter(new ResourcePatternFilter(TOPIC, "baz",
PatternType.ANY), AccessControlEntryFilter.ANY));
+ List<AclDeleteResult> deleteResults = new ArrayList<>();
+ for (CompletionStage<AclDeleteResult> stage :
authorizer.deleteAcls(requestContext, filters)) {
+ deleteResults.add(stage.toCompletableFuture().get());
+ }
+ assertTrue(deleteResults.stream().noneMatch(r ->
r.exception().isPresent()));
+ for (int i = 0; i < filters.size(); i++) {
+
assertTrue(deleteResults.get(i).aclBindingDeleteResults().stream().noneMatch(r
-> r.exception().isPresent()));
+ }
+ assertEquals(Set.of(acl3, acl4),
deleteResults.get(0).aclBindingDeleteResults().stream().map(AclDeleteResult.AclBindingDeleteResult::aclBinding).collect(Collectors.toSet()));
+ assertEquals(Set.of(acl1),
deleteResults.get(1).aclBindingDeleteResults().stream().map(AclDeleteResult.AclBindingDeleteResult::aclBinding).collect(Collectors.toSet()));
+ // standard authorizer first finds the acls that match filters and
then delete them.
+ // So filters[2] will match acl3 even though it is also matching
filters[0] and will be deleted by it
+ assertEquals(Set.of(acl3),
deleteResults.get(2).aclBindingDeleteResults().stream().map(AclDeleteResult.AclBindingDeleteResult::aclBinding).collect(Collectors.toSet()));
+ assertEquals(Set.of(),
deleteResults.get(3).aclBindingDeleteResults().stream().map(AclDeleteResult.AclBindingDeleteResult::aclBinding).collect(Collectors.toSet()));
+ }
+
+ @Test
+ public void testAuthorizeByResourceTypeNoAclFoundOverride() throws
IOException {
+ Map<String, Object> cfg = configs();
+ cfg.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG,
"true");
+
+ try (Authorizer authorizer = createAuthorizer(cfg)) {
+ assertTrue(authorizeByResourceType(authorizer, requestContext,
READ, resource.resourceType()),
+ "If allow.everyone.if.no.acl.found = true, caller should
have read access to at least one topic");
+ assertTrue(authorizeByResourceType(authorizer, requestContext,
WRITE, resource.resourceType()),
+ "If allow.everyone.if.no.acl.found = true, caller should
have write access to at least one topic");
+ }
+ }
+
+ private <T> Set<T> toSet(Iterable<T> iterable) {
+ return StreamSupport.stream(iterable.spliterator(),
false).collect(Collectors.toSet());
+ }
+
+ private Set<AccessControlEntry> changeAclAndVerify(Set<AccessControlEntry>
originalAcls,
+ Set<AccessControlEntry>
addedAcls,
+ Set<AccessControlEntry>
removedAcls) throws Exception {
+ return changeAclAndVerify(originalAcls, addedAcls, removedAcls,
resource);
+ }
+
+ private Set<AccessControlEntry> changeAclAndVerify(Set<AccessControlEntry>
originalAcls,
+ Set<AccessControlEntry>
addedAcls,
+ Set<AccessControlEntry>
removedAcls,
+ ResourcePattern
resource) throws Exception {
+ Set<AccessControlEntry> acls = new HashSet<>(originalAcls);
+
+ if (!addedAcls.isEmpty()) {
+ addAcls(authorizer, addedAcls, resource);
+ acls.addAll(addedAcls);
+ }
+
+ if (!removedAcls.isEmpty()) {
+ removeAcls(authorizer, removedAcls, resource);
+ acls.removeAll(removedAcls);
+ }
+
+ ServerTestUtils.waitAndVerifyAcls(acls, authorizer, resource,
AccessControlEntryFilter.ANY);
+
+ return acls;
+ }
+
+ private boolean authorize(Authorizer authorizer, RequestContext
requestContext, AclOperation operation, ResourcePattern resource) {
+ Action action = new Action(operation, resource, 1, true, true);
+ return authorizer.authorize(requestContext, List.of(action)).get(0) ==
AuthorizationResult.ALLOWED;
+ }
+
+ private Set<AccessControlEntry> getAcls(Authorizer authorizer,
ResourcePattern resourcePattern) {
+ return toSet(authorizer.acls(new
AclBindingFilter(resourcePattern.toFilter(),
AccessControlEntryFilter.ANY))).stream()
+ .map(AclBinding::entry).collect(Collectors.toSet());
+ }
+
+ private Set<AclBinding> getAcls(Authorizer authorizer, KafkaPrincipal
principal) {
+ AclBindingFilter filter = new
AclBindingFilter(ResourcePatternFilter.ANY,
+ new AccessControlEntryFilter(principal.toString(), null,
AclOperation.ANY, AclPermissionType.ANY));
+ return toSet(authorizer.acls(filter));
+ }
+
+ private Set<AclBinding> getAcls(Authorizer authorizer) {
+ return toSet(authorizer.acls(AclBindingFilter.ANY));
+ }
+
+ private boolean invalidOp(AclOperation op) {
+ return op == AclOperation.ANY || op == AclOperation.UNKNOWN;
+ }
+
+ private Authorizer createAuthorizer(Map<String, ?> configs) {
+ Metrics metrics = new Metrics();
+ metricsInstances.add(metrics);
+ PluginMetricsImpl pluginMetrics = new PluginMetricsImpl(metrics,
Map.of());
+ pluginMetricsInstances.add(pluginMetrics);
+ StandardAuthorizer authorizer = new StandardAuthorizer();
+ authorizer.configure(configs);
+ authorizer.withPluginMetrics(pluginMetrics);
+ authorizer.start(new AuthorizerTestServerInfo(List.of(plaintext)));
+ authorizer.setAclMutator(new MockAclMutator(authorizer));
+ authorizer.completeInitialLoad();
+ return authorizer;
+ }
+
+ private RequestContext newRequestContext(KafkaPrincipal principal,
InetAddress clientAddress) {
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+ RequestHeader header = new RequestHeader(ApiKeys.PRODUCE, (short) 2,
"", 1);
+ return new RequestContext(header, "", clientAddress, principal,
ListenerName.forSecurityProtocol(securityProtocol),
+ securityProtocol, ClientInformation.EMPTY, false);
+ }
+
+ private boolean authorizeByResourceType(Authorizer authorizer,
RequestContext requestContext, AclOperation operation, ResourceType
resourceType) {
+ return authorizer.authorizeByResourceType(requestContext, operation,
resourceType) == AuthorizationResult.ALLOWED;
+ }
+
+ private void addAcls(Authorizer authorizer, Set<AccessControlEntry> aces,
ResourcePattern resourcePattern) throws Exception {
+ List<AclBinding> bindings = aces.stream().map(ace -> new
AclBinding(resourcePattern, ace)).toList();
+ List<? extends CompletionStage<AclCreateResult>> results =
authorizer.createAcls(requestContext, bindings);
+ for (CompletionStage<AclCreateResult> ac : results) {
+ AclCreateResult result = ac.toCompletableFuture().get();
+ result.exception().ifPresent(e -> {
+ throw e;
+ });
+ }
+ }
+
+ private void removeAcls(Authorizer authorizer, Set<AccessControlEntry>
aces, ResourcePattern resourcePattern) throws Exception {
+ List<AclBindingFilter> filters = aces.isEmpty()
+ ? List.of(new AclBindingFilter(resourcePattern.toFilter(),
AccessControlEntryFilter.ANY))
+ : aces.stream().map(ace -> new AclBinding(resourcePattern,
ace).toFilter()).toList();
+
+ for (CompletionStage<AclDeleteResult> stage :
authorizer.deleteAcls(requestContext, filters)) {
+ AclDeleteResult result = stage.toCompletableFuture().get();
+ result.exception().ifPresent(e -> {
+ throw e;
+ });
+ for (AclDeleteResult.AclBindingDeleteResult r :
result.aclBindingDeleteResults()) {
+ r.exception().ifPresent(e -> {
+ throw e;
+ });
+ }
+ }
+ }
+
+}