This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f4d38b996b6 KAFKA-20564 Move AuthorizerTest to server (#22261)
f4d38b996b6 is described below

commit f4d38b996b688959043623162568c2f650a79463
Author: Mickael Maison <[email protected]>
AuthorDate: Fri May 15 15:08:41 2026 +0200

    KAFKA-20564 Move AuthorizerTest to server (#22261)
    
    Merge BaseAuthorizerTest and AuthorizerTest to move them to the server
    module
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 build.gradle                                       |    1 +
 checkstyle/import-control-server-common.xml        |    1 +
 checkstyle/import-control-server.xml               |    1 +
 .../kafka/security/authorizer/AuthorizerTest.scala |  682 -------------
 .../security/authorizer/BaseAuthorizerTest.scala   |  373 -------
 .../apache/kafka/server/util/ServerTestUtils.java  |   35 +
 .../kafka/security/authorizer/AuthorizerTest.java  | 1069 ++++++++++++++++++++
 7 files changed, 1107 insertions(+), 1055 deletions(-)

diff --git a/build.gradle b/build.gradle
index 9f1329dccff..9cee79e0610 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1035,6 +1035,7 @@ project(':server') {
     testImplementation project(':test-common:test-common-runtime')
     testImplementation testFixtures(project(':storage:storage-api'))
     testImplementation testFixtures(project(':server-common'))
+    testImplementation testFixtures(project(':metadata'))
 
     testFixturesImplementation project(':clients')
     testFixturesImplementation testFixtures(project(':clients'))
diff --git a/checkstyle/import-control-server-common.xml 
b/checkstyle/import-control-server-common.xml
index fd44a4a3f4b..7d075dd71d7 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -134,6 +134,7 @@
 
             <allow 
class="org.apache.kafka.server.util.TopicFilter.IncludeList" />
             <allow class="org.apache.kafka.test.TestUtils" />
+            <allow class="org.apache.kafka.server.authorizer.Authorizer" />
             <!-- ServerTestUtils uses yammer metrics for test cleanup -->
             <allow pkg="org.apache.kafka.server.metrics" />
             <allow pkg="com.yammer.metrics.core" />
diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index 9f7cc661a66..db528b8c717 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -114,6 +114,7 @@
     <allow pkg="org.apache.kafka.network" />
     <allow pkg="org.apache.kafka.server" />
     <allow pkg="org.apache.kafka.server.authorizer" />
+    <allow class="org.apache.kafka.controller.MockAclMutator" />
   </subpackage>
 
   <subpackage name="network">
diff --git 
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
deleted file mode 100644
index 14f4936c8a8..00000000000
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
+++ /dev/null
@@ -1,682 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.authorizer
-
-import kafka.server.{KafkaConfig, QuorumTestHarness}
-import kafka.utils.TestUtils
-import org.apache.kafka.common.Endpoint
-import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.metrics.{Metrics, PluginMetrics}
-import org.apache.kafka.common.metrics.internals.PluginMetricsImpl
-import org.apache.kafka.common.requests.RequestContext
-import org.apache.kafka.common.resource.PatternType.{LITERAL, MATCH, PREFIXED}
-import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
-import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
-import org.apache.kafka.common.resource.ResourceType._
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, ResourceType}
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.common.utils.internals.{SecurityUtils => 
JSecurityUtils}
-import org.apache.kafka.controller.MockAclMutator
-import org.apache.kafka.metadata.authorizer.{AuthorizerTestServerInfo, 
StandardAuthorizer}
-import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, 
WILDCARD_PRINCIPAL_STRING}
-import org.apache.kafka.server.authorizer._
-import org.apache.kafka.server.util.ServerTestUtils
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
-import org.junit.jupiter.api.Test
-
-import java.net.InetAddress
-import java.util
-import java.util.{Properties, UUID}
-import scala.jdk.CollectionConverters._
-
-class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
-
-  private final val PLAINTEXT = new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "127.0.0.1", 9020)
-
-  private val allowReadAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, 
WILDCARD_HOST, READ, ALLOW)
-  private val allowWriteAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW)
-  private val denyReadAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, 
WILDCARD_HOST, READ, DENY)
-
-  private val wildCardResource = new ResourcePattern(TOPIC, WILDCARD_RESOURCE, 
LITERAL)
-  private val prefixedResource = new ResourcePattern(TOPIC, "foo", PREFIXED)
-  private val clusterResource = new ResourcePattern(CLUSTER, CLUSTER_NAME, 
LITERAL)
-  private val wildcardPrincipal = 
JSecurityUtils.parseKafkaPrincipal(WILDCARD_PRINCIPAL_STRING)
-
-  private var authorizer1: Authorizer = _
-  private var authorizer2: Authorizer = _
-
-  private var _testInfo: TestInfo = _
-
-  class CustomPrincipal(principalType: String, name: String) extends 
KafkaPrincipal(principalType, name) {
-    override def equals(o: scala.Any): Boolean = false
-  }
-
-  override def authorizer: Authorizer = authorizer1
-
-  def testInfo: TestInfo = _testInfo
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    super.setUp(testInfo)
-    _testInfo = testInfo
-
-    val props = properties
-    config = KafkaConfig.fromProps(props)
-    authorizer1 = createAuthorizer()
-    configureAuthorizer(authorizer1, config.originals, new 
PluginMetricsImpl(new Metrics(), util.Map.of()))
-    authorizer2 = createAuthorizer()
-    configureAuthorizer(authorizer2, config.originals, new 
PluginMetricsImpl(new Metrics(), util.Map.of()))
-    resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
-  }
-
-  def properties: Properties = {
-    val props = TestUtils.createBrokerConfig(0)
-    props.put(StandardAuthorizer.SUPER_USERS_CONFIG, superUsers)
-    props
-  }
-
-  @AfterEach
-  override def tearDown(): Unit = {
-    authorizer1.close()
-    authorizer2.close()
-    ServerTestUtils.clearYammerMetrics()
-    super.tearDown()
-  }
-
-  @Test
-  def testAuthorizeThrowsOnNonLiteralResource(): Unit = {
-    assertThrows(classOf[IllegalArgumentException], () => 
authorize(authorizer1, requestContext, READ,
-      new ResourcePattern(TOPIC, "something", PREFIXED)))
-  }
-
-  @Test
-  def testAuthorizeWithEmptyResourceName(): Unit = {
-    assertFalse(authorize(authorizer1, requestContext, READ, new 
ResourcePattern(GROUP, "", LITERAL)))
-    addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP, 
WILDCARD_RESOURCE, LITERAL))
-    assertTrue(authorize(authorizer1, requestContext, READ, new 
ResourcePattern(GROUP, "", LITERAL)))
-  }
-
-  // Authorizing the empty resource is not supported because empty resource 
name is invalid.
-  @Test
-  def testEmptyAclThrowsException(): Unit = {
-    assertThrows(classOf[ApiException],
-      () => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP, 
"", LITERAL)))
-  }
-
-  @Test
-  def testTopicAcl(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
-    val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val host2 = InetAddress.getByName("192.168.1.2")
-
-    //user1 has READ access from host1 and host2.
-    val acl1 = new AccessControlEntry(user1.toString, host1.getHostAddress, 
READ, ALLOW)
-    val acl2 = new AccessControlEntry(user1.toString, host2.getHostAddress, 
READ, ALLOW)
-
-    //user1 does not have  READ access from host1.
-    val acl3 = new AccessControlEntry(user1.toString, host1.getHostAddress, 
READ, DENY)
-
-    //user1 has WRITE access from host1 only.
-    val acl4 = new AccessControlEntry(user1.toString, host1.getHostAddress, 
WRITE, ALLOW)
-
-    //user1 has DESCRIBE access from all hosts.
-    val acl5 = new AccessControlEntry(user1.toString, WILDCARD_HOST, DESCRIBE, 
ALLOW)
-
-    //user2 has READ access from all hosts.
-    val acl6 = new AccessControlEntry(user2.toString, WILDCARD_HOST, READ, 
ALLOW)
-
-    //user3 has WRITE access from all hosts.
-    val acl7 = new AccessControlEntry(user3.toString, WILDCARD_HOST, WRITE, 
ALLOW)
-
-    val acls = Set(acl1, acl2, acl3, acl4, acl5, acl6, acl7)
-
-    changeAclAndVerify(Set.empty, acls, Set.empty)
-
-    val host1Context = newRequestContext(user1, host1)
-    val host2Context = newRequestContext(user1, host2)
-
-    assertTrue(authorize(authorizer1, host2Context, READ, resource), "User1 
should have READ access from host2")
-    assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1 
should not have READ access from host1 due to denyAcl")
-    assertTrue(authorize(authorizer1, host1Context, WRITE, resource), "User1 
should have WRITE access from host1")
-    assertFalse(authorize(authorizer1, host2Context, WRITE, resource), "User1 
should not have WRITE access from host2 as no allow acl is defined")
-    assertTrue(authorize(authorizer1, host1Context, DESCRIBE, resource), 
"User1 should not have DESCRIBE access from host1")
-    assertTrue(authorize(authorizer1, host2Context, DESCRIBE, resource), 
"User1 should have DESCRIBE access from host2")
-    assertFalse(authorize(authorizer1, host1Context, ALTER, resource), "User1 
should not have edit access from host1")
-    assertFalse(authorize(authorizer1, host2Context, ALTER, resource), "User1 
should not have edit access from host2")
-
-    //test if user has READ or WRITE access they also get DESCRIBE access
-    val user2Context = newRequestContext(user2, host1)
-    val user3Context = newRequestContext(user3, host1)
-    assertTrue(authorize(authorizer1, user2Context, DESCRIBE, resource), 
"User2 should have DESCRIBE access from host1")
-    assertTrue(authorize(authorizer1, user3Context, DESCRIBE, resource), 
"User3 should have DESCRIBE access from host2")
-    assertTrue(authorize(authorizer1, user2Context, READ, resource), "User2 
should have READ access from host1")
-    assertTrue(authorize(authorizer1, user3Context, WRITE, resource), "User3 
should have WRITE access from host2")
-  }
-
-  /**
-   * CustomPrincipals should be compared with their principal type and name
-   */
-  @Test
-  def testAllowAccessWithCustomPrincipal(): Unit = {
-    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, 
username)
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val host2 = InetAddress.getByName("192.168.1.2")
-
-    // user has READ access from host2 but not from host1
-    val acl1 = new AccessControlEntry(user.toString, host1.getHostAddress, 
READ, DENY)
-    val acl2 = new AccessControlEntry(user.toString, host2.getHostAddress, 
READ, ALLOW)
-    val acls = Set(acl1, acl2)
-    changeAclAndVerify(Set.empty, acls, Set.empty)
-
-    val host1Context = newRequestContext(customUserPrincipal, host1)
-    val host2Context = newRequestContext(customUserPrincipal, host2)
-
-    assertTrue(authorize(authorizer1, host2Context, READ, resource), "User1 
should have READ access from host2")
-    assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1 
should not have READ access from host1 due to denyAcl")
-  }
-
-  @Test
-  def testDenyTakesPrecedence(): Unit = {
-    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host = InetAddress.getByName("192.168.2.1")
-    val session = newRequestContext(user, host)
-
-    val allowAll = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, 
WILDCARD_HOST, AclOperation.ALL, ALLOW)
-    val denyAcl = new AccessControlEntry(user.toString, host.getHostAddress, 
AclOperation.ALL, DENY)
-    val acls = Set(allowAll, denyAcl)
-
-    changeAclAndVerify(Set.empty, acls, Set.empty)
-
-    assertFalse(authorize(authorizer1, session, READ, resource), "deny should 
take precedence over allow.")
-  }
-
-  @Test
-  def testAllowAllAccess(): Unit = {
-    val allowAllAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, 
WILDCARD_HOST, AclOperation.ALL, ALLOW)
-
-    changeAclAndVerify(Set.empty, Set(allowAllAcl), Set.empty)
-
-    val context = newRequestContext(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), 
InetAddress.getByName("192.0.4.4"))
-    assertTrue(authorize(authorizer1, context, READ, resource), "allow all acl 
should allow access to all.")
-  }
-
-  @Test
-  def testSuperUserHasAccess(): Unit = {
-    val denyAllAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, 
WILDCARD_HOST, AclOperation.ALL, DENY)
-
-    changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty)
-
-    val session1 = newRequestContext(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), 
InetAddress.getByName("192.0.4.4"))
-    val session2 = newRequestContext(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), 
InetAddress.getByName("192.0.4.4"))
-
-    assertTrue(authorize(authorizer1, session1, READ, resource), "superuser 
always has access, no matter what acls.")
-    assertTrue(authorize(authorizer1, session2, READ, resource), "superuser 
always has access, no matter what acls.")
-  }
-
-  /**
-   * CustomPrincipals should be compared with their principal type and name
-   */
-  @Test
-  def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
-    val denyAllAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, 
WILDCARD_HOST, AclOperation.ALL, DENY)
-    changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty)
-
-    val session = newRequestContext(new 
CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), 
InetAddress.getByName("192.0.4.4"))
-
-    assertTrue(authorize(authorizer1, session, READ, resource), "superuser 
with custom principal always has access, no matter what acls.")
-  }
-
-  @Test
-  def testWildCardAcls(): Unit = {
-    assertFalse(authorize(authorizer1, requestContext, READ, resource), "when 
acls = [], authorizer should fail close.")
-
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host1 = InetAddress.getByName("192.168.3.1")
-    val readAcl = new AccessControlEntry(user1.toString, host1.getHostAddress, 
READ, ALLOW)
-
-    val acls = changeAclAndVerify(Set.empty, Set(readAcl), Set.empty, 
wildCardResource)
-
-    val host1Context = newRequestContext(user1, host1)
-    assertTrue(authorize(authorizer1, host1Context, READ, resource), "User1 
should have READ access from host1")
-
-    //allow WRITE to specific topic.
-    val writeAcl = new AccessControlEntry(user1.toString, 
host1.getHostAddress, WRITE, ALLOW)
-    changeAclAndVerify(Set.empty, Set(writeAcl), Set.empty)
-
-    //deny WRITE to wild card topic.
-    val denyWriteOnWildCardResourceAcl = new 
AccessControlEntry(user1.toString, host1.getHostAddress, WRITE, DENY)
-    changeAclAndVerify(acls, Set(denyWriteOnWildCardResourceAcl), Set.empty, 
wildCardResource)
-
-    assertFalse(authorize(authorizer1, host1Context, WRITE, resource), "User1 
should not have WRITE access from host1")
-  }
-
-  @Test
-  def testNoAclFound(): Unit = {
-    assertFalse(authorize(authorizer1, requestContext, READ, resource), "when 
acls = [], authorizer should deny op.")
-  }
-
-  @Test
-  def testNoAclFoundOverride(): Unit = {
-    val props = properties
-    props.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, 
"true")
-
-    val cfg = KafkaConfig.fromProps(props)
-    val testAuthorizer = createAuthorizer()
-    try {
-      configureAuthorizer(testAuthorizer, cfg.originals, new 
PluginMetricsImpl(new Metrics(), util.Map.of()))
-      assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
-        "when acls = null or [],  authorizer should allow op with 
allow.everyone = true.")
-    } finally {
-      testAuthorizer.close()
-    }
-  }
-
-  @Test
-  def testAclConfigWithWhitespace(): Unit = {
-    val props = properties
-    props.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, " 
true")
-    // replace all property values with leading & trailing whitespaces
-    props.replaceAll((_, v) => " " + v + " ")
-    val cfg = KafkaConfig.fromProps(props)
-    val testAuthorizer = createAuthorizer()
-    try {
-      configureAuthorizer(testAuthorizer, cfg.originals, new 
PluginMetricsImpl(new Metrics(), util.Map.of()))
-      assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
-        "when acls = null or [],  authorizer should allow op with 
allow.everyone = true.")
-    } finally {
-      testAuthorizer.close()
-    }
-  }
-
-  @Test
-  def testAclManagementAPIs(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
-    val host1 = "host1"
-    val host2 = "host2"
-
-    val acl1 = new AccessControlEntry(user1.toString, host1, READ, ALLOW)
-    val acl2 = new AccessControlEntry(user1.toString, host1, WRITE, ALLOW)
-    val acl3 = new AccessControlEntry(user2.toString, host2, READ, ALLOW)
-    val acl4 = new AccessControlEntry(user2.toString, host2, WRITE, ALLOW)
-
-    var acls = changeAclAndVerify(Set.empty, Set(acl1, acl2, acl3, acl4), 
Set.empty)
-
-    //test addAcl is additive
-    val acl5 = new AccessControlEntry(user2.toString, WILDCARD_HOST, READ, 
ALLOW)
-    acls = changeAclAndVerify(acls, Set(acl5), Set.empty)
-
-    //test get by principal name.
-    TestUtils.waitUntilTrue(() => Set(acl1, acl2).map(acl => new 
AclBinding(resource, acl)) == getAcls(authorizer1, user1),
-      "changes not propagated in timeout period")
-    TestUtils.waitUntilTrue(() => Set(acl3, acl4, acl5).map(acl => new 
AclBinding(resource, acl)) == getAcls(authorizer1, user2),
-      "changes not propagated in timeout period")
-
-    val resourceToAcls = Map[ResourcePattern, Set[AccessControlEntry]](
-      new ResourcePattern(TOPIC, WILDCARD_RESOURCE, LITERAL) -> Set(new 
AccessControlEntry(user2.toString, WILDCARD_HOST, READ, ALLOW)),
-      new ResourcePattern(CLUSTER, WILDCARD_RESOURCE, LITERAL) -> Set(new 
AccessControlEntry(user2.toString, host1, READ, ALLOW)),
-      new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL) -> acls,
-      new ResourcePattern(GROUP, "test-ConsumerGroup", LITERAL) -> acls
-    )
-
-    resourceToAcls foreach { case (key, value) => 
changeAclAndVerify(Set.empty, value, Set.empty, key) }
-    val expectedAcls = (resourceToAcls + (resource -> acls)).flatMap {
-      case (res, resAcls) => resAcls.map { acl => new AclBinding(res, acl) }
-    }.toSet
-    TestUtils.waitUntilTrue(() => expectedAcls == getAcls(authorizer1), 
"changes not propagated in timeout period.")
-
-    //test remove acl from existing acls.
-    acls = changeAclAndVerify(acls, Set.empty, Set(acl1, acl5))
-
-    //test remove all acls for resource
-    removeAcls(authorizer1, Set.empty, resource)
-    TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, 
resource, AccessControlEntryFilter.ANY)
-
-    acls = changeAclAndVerify(Set.empty, Set(acl1), Set.empty)
-    changeAclAndVerify(acls, Set.empty, acls)
-  }
-
-  @Test
-  def testLocalConcurrentModificationOfResourceAcls(): Unit = {
-    val commonResource = new ResourcePattern(TOPIC, "test", LITERAL)
-
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val acl1 = new AccessControlEntry(user1.toString, WILDCARD_HOST, READ, 
ALLOW)
-
-    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
-    val acl2 = new AccessControlEntry(user2.toString, WILDCARD_HOST, READ, 
DENY)
-
-    addAcls(authorizer1, Set(acl1), commonResource)
-    addAcls(authorizer1, Set(acl2), commonResource)
-
-    TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource, 
AccessControlEntryFilter.ANY)
-  }
-
-  /**
-   * Test ACL inheritance, as described in 
#{org.apache.kafka.common.acl.AclOperation}
-   */
-  @Test
-  def testAclInheritance(): Unit = {
-    testImplicationsOfAllow(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, 
ALTER, DESCRIBE,
-      CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, 
CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT))
-    testImplicationsOfDeny(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, 
ALTER, DESCRIBE,
-      CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, 
CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT))
-    testImplicationsOfAllow(READ, Set(DESCRIBE))
-    testImplicationsOfAllow(WRITE, Set(DESCRIBE))
-    testImplicationsOfAllow(DELETE, Set(DESCRIBE))
-    testImplicationsOfAllow(ALTER, Set(DESCRIBE))
-    testImplicationsOfDeny(DESCRIBE, Set())
-    testImplicationsOfAllow(ALTER_CONFIGS, Set(DESCRIBE_CONFIGS))
-    testImplicationsOfDeny(DESCRIBE_CONFIGS, Set())
-  }
-
-  private def testImplicationsOfAllow(parentOp: AclOperation, allowedOps: 
Set[AclOperation]): Unit = {
-    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host = InetAddress.getByName("192.168.3.1")
-    val hostContext = newRequestContext(user, host)
-    val acl = new AccessControlEntry(user.toString, WILDCARD_HOST, parentOp, 
ALLOW)
-    addAcls(authorizer1, Set(acl), clusterResource)
-    AclOperation.values.filter(validOp).foreach { op =>
-      val authorized = authorize(authorizer1, hostContext, op, clusterResource)
-      if (allowedOps.contains(op) || op == parentOp)
-        assertTrue(authorized, s"ALLOW $parentOp should imply ALLOW $op")
-      else
-        assertFalse(authorized, s"ALLOW $parentOp should not imply ALLOW $op")
-    }
-    removeAcls(authorizer1, Set(acl), clusterResource)
-  }
-
-  private def testImplicationsOfDeny(parentOp: AclOperation, deniedOps: 
Set[AclOperation]): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val host1 = InetAddress.getByName("192.168.3.1")
-    val host1Context = newRequestContext(user1, host1)
-    val acls = Set(new AccessControlEntry(user1.toString, WILDCARD_HOST, 
parentOp, DENY),
-      new AccessControlEntry(user1.toString, WILDCARD_HOST, AclOperation.ALL, 
ALLOW))
-    addAcls(authorizer1, acls, clusterResource)
-    AclOperation.values.filter(validOp).foreach { op =>
-      val authorized = authorize(authorizer1, host1Context, op, 
clusterResource)
-      if (deniedOps.contains(op) || op == parentOp)
-        assertFalse(authorized, s"DENY $parentOp should imply DENY $op")
-      else
-        assertTrue(authorized, s"DENY $parentOp should not imply DENY $op")
-    }
-    removeAcls(authorizer1, acls, clusterResource)
-  }
-
-  @Test
-  def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
-    addAcls(authorizer1, Set(allowReadAcl), wildCardResource)
-
-    assertTrue(authorize(authorizer1, requestContext, READ, resource))
-  }
-
-  @Test
-  def testDeleteAclOnWildcardResource(): Unit = {
-    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource)
-
-    removeAcls(authorizer1, Set(allowReadAcl), wildCardResource)
-
-    assertEquals(Set(allowWriteAcl), getAcls(authorizer1, wildCardResource))
-  }
-
-  @Test
-  def testDeleteAllAclOnWildcardResource(): Unit = {
-    addAcls(authorizer1, Set(allowReadAcl), wildCardResource)
-
-    removeAcls(authorizer1, Set.empty, wildCardResource)
-
-    assertEquals(Set.empty, getAcls(authorizer1))
-  }
-
-  @Test
-  def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
-    addAcls(authorizer1, Set(allowReadAcl), prefixedResource)
-
-    assertTrue(authorize(authorizer1, requestContext, READ, resource))
-  }
-
-  @Test
-  def testDeleteAclOnPrefixedResource(): Unit = {
-    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
-
-    removeAcls(authorizer1, Set(allowReadAcl), prefixedResource)
-
-    assertEquals(Set(allowWriteAcl), getAcls(authorizer1, prefixedResource))
-  }
-
-  @Test
-  def testDeleteAllAclOnPrefixedResource(): Unit = {
-    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
-
-    removeAcls(authorizer1, Set.empty, prefixedResource)
-
-    assertEquals(Set.empty, getAcls(authorizer1))
-  }
-
-  @Test
-  def testAddAclsOnLiteralResource(): Unit = {
-    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), resource)
-    addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), resource)
-
-    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), 
getAcls(authorizer1, resource))
-    assertEquals(Set.empty, getAcls(authorizer1, wildCardResource))
-    assertEquals(Set.empty, getAcls(authorizer1, prefixedResource))
-  }
-
-  @Test
-  def testAddAclsOnWildcardResource(): Unit = {
-    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource)
-    addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), wildCardResource)
-
-    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), 
getAcls(authorizer1, wildCardResource))
-    assertEquals(Set.empty, getAcls(authorizer1, resource))
-    assertEquals(Set.empty, getAcls(authorizer1, prefixedResource))
-  }
-
-  @Test
-  def testAddAclsOnPrefixedResource(): Unit = {
-    addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
-    addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), prefixedResource)
-
-    assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), 
getAcls(authorizer1, prefixedResource))
-    assertEquals(Set.empty, getAcls(authorizer1, wildCardResource))
-    assertEquals(Set.empty, getAcls(authorizer1, resource))
-  }
-
-  @Test
-  def testAuthorizeWithPrefixedResource(): Unit = {
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, 
"a_other", LITERAL))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, 
"a_other", PREFIXED))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + 
UUID.randomUUID(), PREFIXED))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + 
UUID.randomUUID(), PREFIXED))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + 
UUID.randomUUID() + "-zzz", PREFIXED))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fooo-" 
+ UUID.randomUUID(), PREFIXED))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fo-" + 
UUID.randomUUID(), PREFIXED))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fop-" + 
UUID.randomUUID(), PREFIXED))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-" + 
UUID.randomUUID(), PREFIXED))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-", 
PREFIXED))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, 
"z_other", PREFIXED))
-    addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, 
"z_other", LITERAL))
-
-    addAcls(authorizer1, Set(allowReadAcl), prefixedResource)
-
-    assertTrue(authorize(authorizer1, requestContext, READ, resource))
-  }
-
-  @Test
-  def testSingleCharacterResourceAcls(): Unit = {
-    addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "f", 
LITERAL))
-    assertTrue(authorize(authorizer1, requestContext, READ, new 
ResourcePattern(TOPIC, "f", LITERAL)))
-    assertFalse(authorize(authorizer1, requestContext, READ, new 
ResourcePattern(TOPIC, "foo", LITERAL)))
-
-    addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "_", 
PREFIXED))
-    assertTrue(authorize(authorizer1, requestContext, READ, new 
ResourcePattern(TOPIC, "_foo", LITERAL)))
-    assertTrue(authorize(authorizer1, requestContext, READ, new 
ResourcePattern(TOPIC, "_", LITERAL)))
-    assertFalse(authorize(authorizer1, requestContext, READ, new 
ResourcePattern(TOPIC, "foo_", LITERAL)))
-  }
-
-  @Test
-  def testGetAclsPrincipal(): Unit = {
-    val aclOnSpecificPrincipal = new AccessControlEntry(principal.toString, 
WILDCARD_HOST, WRITE, ALLOW)
-    addAcls(authorizer1, Set(aclOnSpecificPrincipal), resource)
-
-    assertEquals(0,
-      getAcls(authorizer1, wildcardPrincipal).size, "acl on specific should 
not be returned for wildcard request")
-    assertEquals(1,
-      getAcls(authorizer1, principal).size, "acl on specific should be 
returned for specific request")
-    assertEquals(1,
-      getAcls(authorizer1, new KafkaPrincipal(principal.getPrincipalType, 
principal.getName)).size, "acl on specific should be returned for different 
principal instance")
-
-    removeAcls(authorizer1, Set.empty, resource)
-    val aclOnWildcardPrincipal = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW)
-    addAcls(authorizer1, Set(aclOnWildcardPrincipal), resource)
-
-    assertEquals(1, getAcls(authorizer1, wildcardPrincipal).size, "acl on 
wildcard should be returned for wildcard request")
-    assertEquals(0, getAcls(authorizer1, principal).size, "acl on wildcard 
should not be returned for specific request")
-  }
-
-  @Test
-  def testAclsFilter(): Unit = {
-    val resource1 = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), 
LITERAL)
-    val resource2 = new ResourcePattern(TOPIC, "bar-" + UUID.randomUUID(), 
LITERAL)
-    val prefixedResource = new ResourcePattern(TOPIC, "bar-", PREFIXED)
-
-    val acl1 = new AclBinding(resource1, new 
AccessControlEntry(principal.toString, WILDCARD_HOST, READ, ALLOW))
-    val acl2 = new AclBinding(resource1, new 
AccessControlEntry(principal.toString, "192.168.0.1", WRITE, ALLOW))
-    val acl3 = new AclBinding(resource2, new 
AccessControlEntry(principal.toString, WILDCARD_HOST, DESCRIBE, ALLOW))
-    val acl4 = new AclBinding(prefixedResource, new 
AccessControlEntry(wildcardPrincipal.toString, WILDCARD_HOST, READ, ALLOW))
-
-    authorizer1.createAcls(requestContext, util.List.of(acl1, acl2, acl3, 
acl4))
-    assertEquals(Set(acl1, acl2, acl3, acl4), 
authorizer1.acls(AclBindingFilter.ANY).asScala.toSet)
-    assertEquals(Set(acl1, acl2), authorizer1.acls(new 
AclBindingFilter(resource1.toFilter, 
AccessControlEntryFilter.ANY)).asScala.toSet)
-    assertEquals(Set(acl4), authorizer1.acls(new 
AclBindingFilter(prefixedResource.toFilter, 
AccessControlEntryFilter.ANY)).asScala.toSet)
-    val matchingFilter = new AclBindingFilter(new 
ResourcePatternFilter(ResourceType.ANY, resource2.name, MATCH), 
AccessControlEntryFilter.ANY)
-    assertEquals(Set(acl3, acl4), 
authorizer1.acls(matchingFilter).asScala.toSet)
-
-    val filters = List(matchingFilter,
-      acl1.toFilter,
-      new AclBindingFilter(resource2.toFilter, AccessControlEntryFilter.ANY),
-      new AclBindingFilter(new ResourcePatternFilter(TOPIC, "baz", 
PatternType.ANY), AccessControlEntryFilter.ANY))
-    val deleteResults = authorizer1.deleteAcls(requestContext, 
filters.asJava).asScala.map(_.toCompletableFuture.get)
-    assertEquals(List.empty, deleteResults.filter(_.exception.isPresent))
-    filters.indices.foreach { i =>
-      assertEquals(Set.empty, 
deleteResults(i).aclBindingDeleteResults.asScala.toSet.filter(_.exception.isPresent))
-    }
-    assertEquals(Set(acl3, acl4), 
deleteResults(0).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
-    assertEquals(Set(acl1), 
deleteResults(1).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
-    // standard authorizer first finds the acls that match filters and then 
delete them.
-    // So filters[2] will match acl3 even though it is also matching 
filters[0] and will be deleted by it
-    assertEquals(Set(acl3), 
deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
-    assertEquals(Set.empty, 
deleteResults(3).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
-  }
-
-  @Test
-  def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = {
-    val props = properties
-    props.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, 
"true")
-
-    val cfg = KafkaConfig.fromProps(props)
-    val authorizer: Authorizer = createAuthorizer()
-    try {
-      configureAuthorizer(authorizer, cfg.originals, new PluginMetricsImpl(new 
Metrics(), util.Map.of()))
-      assertTrue(authorizeByResourceType(authorizer, requestContext, READ, 
resource.resourceType()),
-        "If allow.everyone.if.no.acl.found = true, caller should have read 
access to at least one topic")
-      assertTrue(authorizeByResourceType(authorizer, requestContext, WRITE, 
resource.resourceType()),
-        "If allow.everyone.if.no.acl.found = true, caller should have write 
access to at least one topic")
-    } finally {
-      authorizer.close()
-    }
-  }
-
-  private def changeAclAndVerify(originalAcls: Set[AccessControlEntry],
-                                 addedAcls: Set[AccessControlEntry],
-                                 removedAcls: Set[AccessControlEntry],
-                                 resource: ResourcePattern = resource): 
Set[AccessControlEntry] = {
-    var acls = originalAcls
-
-    if (addedAcls.nonEmpty) {
-      addAcls(authorizer1, addedAcls, resource)
-      acls ++= addedAcls
-    }
-
-    if (removedAcls.nonEmpty) {
-      removeAcls(authorizer1, removedAcls, resource)
-      acls --= removedAcls
-    }
-
-    TestUtils.waitAndVerifyAcls(acls, authorizer1, resource, 
AccessControlEntryFilter.ANY)
-
-    acls
-  }
-
-  private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
-    val action = new Action(operation, resource, 1, true, true)
-    authorizer.authorize(requestContext, util.List.of(action)).asScala.head == 
AuthorizationResult.ALLOWED
-  }
-
-  private def getAcls(authorizer: Authorizer, resourcePattern: 
ResourcePattern): Set[AccessControlEntry] = {
-    val acls = authorizer.acls(new AclBindingFilter(resourcePattern.toFilter, 
AccessControlEntryFilter.ANY)).asScala.toSet
-    acls.map(_.entry)
-  }
-
-  private def getAcls(authorizer: Authorizer, principal: KafkaPrincipal): 
Set[AclBinding] = {
-    val filter = new AclBindingFilter(ResourcePatternFilter.ANY,
-      new AccessControlEntryFilter(principal.toString, null, AclOperation.ANY, 
AclPermissionType.ANY))
-    authorizer.acls(filter).asScala.toSet
-  }
-
-  private def getAcls(authorizer: Authorizer): Set[AclBinding] = {
-    authorizer.acls(AclBindingFilter.ANY).asScala.toSet
-  }
-
-  private def validOp(op: AclOperation): Boolean = {
-    op != AclOperation.ANY && op != AclOperation.UNKNOWN
-  }
-
-  def createAuthorizer(): Authorizer = {
-    new StandardAuthorizer
-  }
-
-  def configureAuthorizer(authorizer: Authorizer,
-                          configs: util.Map[String, AnyRef],
-                          pluginMetrics: PluginMetrics): Unit = {
-    configureStandardAuthorizer(authorizer.asInstanceOf[StandardAuthorizer], 
configs, pluginMetrics)
-  }
-
-  def configureStandardAuthorizer(standardAuthorizer: StandardAuthorizer,
-                                  configs: util.Map[String, AnyRef],
-                                  pluginMetrics: PluginMetrics): Unit = {
-    standardAuthorizer.configure(configs)
-    standardAuthorizer.withPluginMetrics(pluginMetrics)
-    initializeStandardAuthorizer(standardAuthorizer, new 
AuthorizerTestServerInfo(util.List.of(PLAINTEXT)))
-  }
-
-  def initializeStandardAuthorizer(standardAuthorizer: StandardAuthorizer,
-                                   serverInfo: AuthorizerServerInfo): Unit = {
-    val aclMutator = new MockAclMutator(standardAuthorizer)
-    standardAuthorizer.start(serverInfo)
-    standardAuthorizer.setAclMutator(aclMutator)
-    standardAuthorizer.completeInitialLoad()
-  }
-}
diff --git 
a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
deleted file mode 100644
index c7726ff5245..00000000000
--- 
a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
+++ /dev/null
@@ -1,373 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.authorizer
-
-import java.net.InetAddress
-import java.util.UUID
-import kafka.server.KafkaConfig
-import org.apache.kafka.common.acl.AclOperation.{ALL, READ, WRITE}
-import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation}
-import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
-import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
-import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, 
TRANSACTIONAL_ID}
-import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, 
WILDCARD_PRINCIPAL_STRING}
-import org.apache.kafka.server.authorizer.{AuthorizationResult, Authorizer}
-import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
-import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-
-trait BaseAuthorizerTest {
-
-  def authorizer: Authorizer
-
-  val superUsers = "User:superuser1; User:superuser2"
-  val username = "alice"
-  val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-  val requestContext: RequestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"))
-  val superUserName = "superuser1"
-  var config: KafkaConfig = _
-  var resource: ResourcePattern = _
-
-  @Test
-  def testAuthorizeByResourceTypeMultipleAddAndRemove(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
-    val denyRead = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, DENY)
-    val allowRead = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, ALLOW)
-    val u1h1Context = newRequestContext(user1, host1)
-
-    for (_ <- 1 to 10) {
-      assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-        "User1 from host1 should not have READ access to any topic when no ACL 
exists")
-
-      addAcls(authorizer, Set(allowRead), resource1)
-      assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-        "User1 from host1 now should have READ access to at least one topic")
-
-      for (_ <- 1 to 10) {
-        addAcls(authorizer, Set(denyRead), resource1)
-        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-          "User1 from host1 now should not have READ access to any topic")
-
-        removeAcls(authorizer, Set(denyRead), resource1)
-        addAcls(authorizer, Set(allowRead), resource1)
-        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-          "User1 from host1 now should have READ access to at least one topic")
-      }
-
-      removeAcls(authorizer, Set(allowRead), resource1)
-      assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-        "User1 from host1 now should not have READ access to any topic")
-    }
-  }
-
-  @Test
-  def testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow(): 
Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
-    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2")
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val host2 = InetAddress.getByName("192.168.1.2")
-    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
-    val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(), 
LITERAL)
-    val resource3 = new ResourcePattern(GROUP, "s", PREFIXED)
-
-    val acl1 = new AccessControlEntry(user1.toString, host1.getHostAddress, 
READ, DENY)
-    val acl2 = new AccessControlEntry(user2.toString, host1.getHostAddress, 
READ, DENY)
-    val acl3 = new AccessControlEntry(user1.toString, host2.getHostAddress, 
WRITE, DENY)
-    val acl4 = new AccessControlEntry(user1.toString, host2.getHostAddress, 
READ, DENY)
-    val acl5 = new AccessControlEntry(user1.toString, host2.getHostAddress, 
READ, DENY)
-    val acl6 = new AccessControlEntry(user2.toString, host2.getHostAddress, 
READ, DENY)
-    val acl7 = new AccessControlEntry(user1.toString, host2.getHostAddress, 
READ, ALLOW)
-
-    addAcls(authorizer, Set(acl1, acl2, acl3, acl6, acl7), resource1)
-    addAcls(authorizer, Set(acl4), resource2)
-    addAcls(authorizer, Set(acl5), resource3)
-
-    val u1h1Context = newRequestContext(user1, host1)
-    val u1h2Context = newRequestContext(user1, host2)
-
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 should not have READ access to any topic")
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP),
-      "User1 from host2 should not have READ access to any consumer group")
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TRANSACTIONAL_ID),
-      "User1 from host2 should not have READ access to any topic")
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.CLUSTER),
-      "User1 from host2 should not have READ access to any topic")
-    assertTrue(authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC),
-      "User1 from host2 should have READ access to at least one topic")
-  }
-
-  @Test
-  def testAuthorizeByResourceTypeDenyTakesPrecedence(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
-
-    val u1h1Context = newRequestContext(user1, host1)
-    val acl1 = new AccessControlEntry(user1.toString, host1.getHostAddress, 
WRITE, ALLOW)
-    val acl2 = new AccessControlEntry(user1.toString, host1.getHostAddress, 
WRITE, DENY)
-
-    addAcls(authorizer, Set(acl1), resource1)
-    assertTrue(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.TOPIC),
-      "User1 from host1 should have WRITE access to at least one topic")
-
-    addAcls(authorizer, Set(acl2), resource1)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.TOPIC),
-      "User1 from host1 should not have WRITE access to any topic")
-  }
-
-  @Test
-  def testAuthorizeByResourceTypePrefixedResourceDenyDominate(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val a = new ResourcePattern(GROUP, "a", PREFIXED)
-    val ab = new ResourcePattern(GROUP, "ab", PREFIXED)
-    val abc = new ResourcePattern(GROUP, "abc", PREFIXED)
-    val abcd = new ResourcePattern(GROUP, "abcd", PREFIXED)
-    val abcde = new ResourcePattern(GROUP, "abcde", PREFIXED)
-
-    val u1h1Context = newRequestContext(user1, host1)
-    val allowAce = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, ALLOW)
-    val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress, 
READ, DENY)
-
-    addAcls(authorizer, Set(allowAce), abcde)
-    assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP),
-      "User1 from host1 should have READ access to at least one group")
-
-    addAcls(authorizer, Set(denyAce), abcd)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP),
-      "User1 from host1 now should not have READ access to any group")
-
-    addAcls(authorizer, Set(allowAce), abc)
-    assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP),
-      "User1 from host1 now should have READ access to any group")
-
-    addAcls(authorizer, Set(denyAce), a)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP),
-      "User1 from host1 now should not have READ access to any group")
-
-    addAcls(authorizer, Set(allowAce), ab)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP),
-      "User1 from host1 still should not have READ access to any group")
-  }
-
-  @Test
-  def testAuthorizeByResourceTypeWildcardResourceDenyDominate(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val wildcard = new ResourcePattern(GROUP, 
ResourcePattern.WILDCARD_RESOURCE, LITERAL)
-    val prefixed = new ResourcePattern(GROUP, "hello", PREFIXED)
-    val literal = new ResourcePattern(GROUP, "aloha", LITERAL)
-
-    val u1h1Context = newRequestContext(user1, host1)
-    val allowAce = new AccessControlEntry(user1.toString, 
host1.getHostAddress, WRITE, ALLOW)
-    val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress, 
WRITE, DENY)
-
-    addAcls(authorizer, Set(allowAce), prefixed)
-    assertTrue(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.GROUP),
-      "User1 from host1 should have WRITE access to at least one group")
-
-    addAcls(authorizer, Set(denyAce), wildcard)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.GROUP),
-      "User1 from host1 now should not have WRITE access to any group")
-
-    addAcls(authorizer, Set(allowAce), wildcard)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.GROUP),
-      "User1 from host1 still should not have WRITE access to any group")
-
-    addAcls(authorizer, Set(allowAce), literal)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.GROUP),
-      "User1 from host1 still should not have WRITE access to any group")
-  }
-
-  @Test
-  def testAuthorizeByResourceTypeWithAllOperationAce(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
-    val denyAll = new AccessControlEntry(user1.toString, host1.getHostAddress, 
ALL, DENY)
-    val allowAll = new AccessControlEntry(user1.toString, 
host1.getHostAddress, ALL, ALLOW)
-    val denyWrite = new AccessControlEntry(user1.toString, 
host1.getHostAddress, WRITE, DENY)
-    val u1h1Context = newRequestContext(user1, host1)
-
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 should not have READ access to any topic when no ACL 
exists")
-
-    addAcls(authorizer, Set(denyWrite, allowAll), resource1)
-    assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 now should have READ access to at least one topic")
-
-    addAcls(authorizer, Set(denyAll), resource1)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 now should not have READ access to any topic")
-  }
-
-  @Test
-  def testAuthorizeByResourceTypeWithAllHostAce(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val host2 = InetAddress.getByName("192.168.1.2")
-    val allHost = AclEntry.WILDCARD_HOST
-    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
-    val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(), 
LITERAL)
-    val allowHost1 = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, ALLOW)
-    val denyHost1 = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, DENY)
-    val denyAllHost = new AccessControlEntry(user1.toString, allHost, READ, 
DENY)
-    val allowAllHost = new AccessControlEntry(user1.toString, allHost, READ, 
ALLOW)
-    val u1h1Context = newRequestContext(user1, host1)
-    val u1h2Context = newRequestContext(user1, host2)
-
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 should not have READ access to any topic when no ACL 
exists")
-
-    addAcls(authorizer, Set(allowHost1), resource1)
-    assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 should now have READ access to at least one topic")
-
-    addAcls(authorizer, Set(denyAllHost), resource1)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 now shouldn't have READ access to any topic")
-
-    addAcls(authorizer, Set(denyHost1), resource2)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 still should not have READ access to any topic")
-    assertFalse(authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC),
-      "User1 from host2 should not have READ access to any topic")
-
-    addAcls(authorizer, Set(allowAllHost), resource2)
-    assertTrue(authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC),
-      "User1 from host2 should now have READ access to at least one topic")
-
-    addAcls(authorizer, Set(denyAllHost), resource2)
-    assertFalse(authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC),
-      "User1 from host2 now shouldn't have READ access to any topic")
-  }
-
-  @Test
-  def testAuthorizeByResourceTypeWithAllPrincipalAce(): Unit = {
-    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
-    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2")
-    val allUser = AclEntry.WILDCARD_PRINCIPAL_STRING
-    val host1 = InetAddress.getByName("192.168.1.1")
-    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
-    val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(), 
LITERAL)
-    val allowUser1 = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, ALLOW)
-    val denyUser1 = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, DENY)
-    val denyAllUser = new AccessControlEntry(allUser, host1.getHostAddress, 
READ, DENY)
-    val allowAllUser = new AccessControlEntry(allUser, host1.getHostAddress, 
READ, ALLOW)
-    val u1h1Context = newRequestContext(user1, host1)
-    val u2h1Context = newRequestContext(user2, host1)
-
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 should not have READ access to any topic when no ACL 
exists")
-
-    addAcls(authorizer, Set(allowUser1), resource1)
-    assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 should now have READ access to at least one topic")
-
-    addAcls(authorizer, Set(denyAllUser), resource1)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 now shouldn't have READ access to any topic")
-
-    addAcls(authorizer, Set(denyUser1), resource2)
-    assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
-      "User1 from host1 still should not have READ access to any topic")
-    assertFalse(authorizeByResourceType(authorizer, u2h1Context, READ, 
ResourceType.TOPIC),
-      "User2 from host1 should not have READ access to any topic")
-
-    addAcls(authorizer, Set(allowAllUser), resource2)
-    assertTrue(authorizeByResourceType(authorizer, u2h1Context, READ, 
ResourceType.TOPIC),
-      "User2 from host1 should now have READ access to at least one topic")
-
-    addAcls(authorizer, Set(denyAllUser), resource2)
-    assertFalse(authorizeByResourceType(authorizer, u2h1Context, READ, 
ResourceType.TOPIC),
-      "User2 from host1 now shouldn't have READ access to any topic")
-  }
-
-  @Test
-  def testAuthorizeByResourceTypeSuperUserHasAccess(): Unit = {
-    val denyAllAce = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, 
WILDCARD_HOST, AclOperation.ALL, DENY)
-    val superUser1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
superUserName)
-    val host1 = InetAddress.getByName("192.0.4.4")
-    val allTopicsResource = new ResourcePattern(TOPIC, WILDCARD_RESOURCE, 
LITERAL)
-    val clusterResource = new ResourcePattern(CLUSTER, WILDCARD_RESOURCE, 
LITERAL)
-    val groupResource = new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL)
-    val transactionIdResource = new ResourcePattern(TRANSACTIONAL_ID, 
WILDCARD_RESOURCE, LITERAL)
-
-    addAcls(authorizer, Set(denyAllAce), allTopicsResource)
-    addAcls(authorizer, Set(denyAllAce), clusterResource)
-    addAcls(authorizer, Set(denyAllAce), groupResource)
-    addAcls(authorizer, Set(denyAllAce), transactionIdResource)
-
-    val superUserContext = newRequestContext(superUser1, host1)
-
-    assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
ResourceType.TOPIC),
-      "superuser always has access, no matter what acls.")
-    assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
ResourceType.CLUSTER),
-      "superuser always has access, no matter what acls.")
-    assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
ResourceType.GROUP),
-      "superuser always has access, no matter what acls.")
-    assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
ResourceType.TRANSACTIONAL_ID),
-      "superuser always has access, no matter what acls.")
-  }
-
-  def newRequestContext(principal: KafkaPrincipal, clientAddress: InetAddress, 
apiKey: ApiKeys = ApiKeys.PRODUCE): RequestContext = {
-    val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-    val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short 
version, String clientId, int correlation
-    new RequestContext(header, "", clientAddress, principal, 
ListenerName.forSecurityProtocol(securityProtocol),
-      securityProtocol, ClientInformation.EMPTY, false)
-  }
-
-  def authorizeByResourceType(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean 
= {
-    authorizer.authorizeByResourceType(requestContext, operation, 
resourceType) == AuthorizationResult.ALLOWED
-  }
-
-  def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Unit = {
-    val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) }
-    authorizer.createAcls(requestContext, bindings.toList.asJava).asScala
-      .map(_.toCompletableFuture.get)
-      .foreach { result => result.exception.ifPresent { e => throw e } }
-  }
-
-  def removeAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Boolean = {
-    val bindings = if (aces.isEmpty)
-      Set(new AclBindingFilter(resourcePattern.toFilter, 
AccessControlEntryFilter.ANY) )
-    else
-      aces.map { ace => new AclBinding(resourcePattern, ace).toFilter }
-    authorizer.deleteAcls(requestContext, bindings.toList.asJava).asScala
-      .map(_.toCompletableFuture.get)
-      .forall { result =>
-        result.exception.ifPresent { e => throw e }
-        result.aclBindingDeleteResults.forEach { r =>
-          r.exception.ifPresent { e => throw e }
-        }
-        !result.aclBindingDeleteResults.isEmpty
-      }
-  }
-
-}
diff --git 
a/server-common/src/testFixtures/java/org/apache/kafka/server/util/ServerTestUtils.java
 
b/server-common/src/testFixtures/java/org/apache/kafka/server/util/ServerTestUtils.java
index f3898848768..8d1bad658e6 100644
--- 
a/server-common/src/testFixtures/java/org/apache/kafka/server/util/ServerTestUtils.java
+++ 
b/server-common/src/testFixtures/java/org/apache/kafka/server/util/ServerTestUtils.java
@@ -17,10 +17,21 @@
 
 package org.apache.kafka.server.util;
 
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.test.TestUtils;
 
 import com.yammer.metrics.core.Gauge;
 
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
 public final class ServerTestUtils {
 
     /**
@@ -46,4 +57,28 @@ public final class ServerTestUtils {
         return (Number) gauge.value();
     }
 
+    /**
+     * Wait until the ACLs for the given resource match the expected set.
+     *
+     * @param expected                 the expected set of access control 
entries.
+     * @param authorizer               the authorizer to query.
+     * @param resource                 the resource pattern to filter ACLs on.
+     * @param accessControlEntryFilter additional filter for the access 
control entries.
+     */
+    public static void waitAndVerifyAcls(Set<AccessControlEntry> expected,
+                                         Authorizer authorizer,
+                                         ResourcePattern resource,
+                                         AccessControlEntryFilter 
accessControlEntryFilter) throws InterruptedException {
+        String newLine = System.lineSeparator();
+        String delimiter = newLine + "\t";
+        AclBindingFilter filter = new AclBindingFilter(resource.toFilter(), 
accessControlEntryFilter);
+        TestUtils.waitForCondition(() -> 
StreamSupport.stream(authorizer.acls(filter).spliterator(), false)
+                        .map(AclBinding::entry)
+                        .collect(Collectors.toSet()).equals(expected),
+                45000,
+                () -> "expected acls:" + 
expected.stream().map(Object::toString).collect(Collectors.joining(delimiter, 
delimiter, newLine))
+                        + "but got:" + 
StreamSupport.stream(authorizer.acls(filter).spliterator(), false)
+                        .map(b -> 
b.entry().toString()).collect(Collectors.joining(delimiter, delimiter, 
newLine)));
+    }
+
 }
diff --git 
a/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java 
b/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java
new file mode 100644
index 00000000000..926d185548d
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java
@@ -0,0 +1,1069 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.internals.SecurityUtils;
+import org.apache.kafka.controller.MockAclMutator;
+import org.apache.kafka.metadata.authorizer.AuthorizerTestServerInfo;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.raft.KRaftConfigs;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.util.ServerTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.kafka.common.acl.AclOperation.ALL;
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.TWO_PHASE_COMMIT;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.acl.AclPermissionType.DENY;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.MATCH;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME;
+import static 
org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE;
+import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
+import static org.apache.kafka.common.resource.ResourceType.GROUP;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
+import static org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST;
+import static 
org.apache.kafka.security.authorizer.AclEntry.WILDCARD_PRINCIPAL_STRING;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AuthorizerTest {
+
+    private final Endpoint plaintext = new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "127.0.0.1", 9020);
+
+    private final AccessControlEntry allowReadAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, READ, ALLOW);
+    private final AccessControlEntry allowWriteAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW);
+    private final AccessControlEntry denyReadAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, READ, DENY);
+
+    private final ResourcePattern wildCardResource = new 
ResourcePattern(TOPIC, WILDCARD_RESOURCE, LITERAL);
+    private final ResourcePattern prefixedResource = new 
ResourcePattern(TOPIC, "foo", PREFIXED);
+    private final ResourcePattern clusterResource = new 
ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL);
+    private final KafkaPrincipal wildcardPrincipal = 
SecurityUtils.parseKafkaPrincipal(WILDCARD_PRINCIPAL_STRING);
+
+    private final String username = "alice";
+    private final KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, username);
+    private final List<Metrics> metricsInstances = new ArrayList<>();
+    private final List<PluginMetricsImpl> pluginMetricsInstances = new 
ArrayList<>();
+
+    private Authorizer authorizer;
+    private RequestContext requestContext;
+    private ResourcePattern resource;
+
+    static class CustomPrincipal extends KafkaPrincipal {
+
+        public CustomPrincipal(String principalType, String name) {
+            super(principalType, name);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }
+    }
+
+    @BeforeEach
+    public void setup() throws Exception {
+        requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"));
+        authorizer = createAuthorizer(configs());
+        resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), 
LITERAL);
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        authorizer.close();
+        for (PluginMetricsImpl pluginMetric : pluginMetricsInstances) {
+            pluginMetric.close();
+        }
+        for (Metrics metrics : metricsInstances) {
+            metrics.close();
+        }
+    }
+
+    private Map<String, Object> configs() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(StandardAuthorizer.SUPER_USERS_CONFIG, "User:superuser1; 
User:superuser2");
+        configs.put(KRaftConfigs.NODE_ID_CONFIG, "0");
+        return configs;
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeMultipleAddAndRemove() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        AccessControlEntry denyRead = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry allowRead = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+
+        for (int i = 0; i < 10; i++) {
+            assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                    "User1 from host1 should not have READ access to any topic 
when no ACL exists");
+
+            addAcls(authorizer, Set.of(allowRead), resource1);
+            assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                    "User1 from host1 now should have READ access to at least 
one topic");
+
+            for (int j = 0; j < 10; j++) {
+                addAcls(authorizer, Set.of(denyRead), resource1);
+                assertFalse(authorizeByResourceType(authorizer, u1h1Context, 
READ, TOPIC),
+                        "User1 from host1 now should not have READ access to 
any topic");
+
+                removeAcls(authorizer, Set.of(denyRead), resource1);
+                addAcls(authorizer, Set.of(allowRead), resource1);
+                assertTrue(authorizeByResourceType(authorizer, u1h1Context, 
READ, TOPIC),
+                        "User1 from host1 now should have READ access to at 
least one topic");
+            }
+
+            removeAcls(authorizer, Set.of(allowRead), resource1);
+            assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                    "User1 from host1 now should not have READ access to any 
topic");
+        }
+    }
+
+    @Test
+    public void 
testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user2");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        InetAddress host2 = InetAddress.getByName("192.168.1.2");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        ResourcePattern resource2 = new ResourcePattern(TOPIC, "sb2" + 
UUID.randomUUID(), LITERAL);
+        ResourcePattern resource3 = new ResourcePattern(GROUP, "s", PREFIXED);
+
+        AccessControlEntry acl1 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry acl2 = new AccessControlEntry(user2.toString(), 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry acl3 = new AccessControlEntry(user1.toString(), 
host2.getHostAddress(), WRITE, DENY);
+        AccessControlEntry acl4 = new AccessControlEntry(user1.toString(), 
host2.getHostAddress(), READ, DENY);
+        AccessControlEntry acl5 = new AccessControlEntry(user1.toString(), 
host2.getHostAddress(), READ, DENY);
+        AccessControlEntry acl6 = new AccessControlEntry(user2.toString(), 
host2.getHostAddress(), READ, DENY);
+        AccessControlEntry acl7 = new AccessControlEntry(user1.toString(), 
host2.getHostAddress(), READ, ALLOW);
+
+        addAcls(authorizer, Set.of(acl1, acl2, acl3, acl6, acl7), resource1);
+        addAcls(authorizer, Set.of(acl4), resource2);
+        addAcls(authorizer, Set.of(acl5), resource3);
+
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        RequestContext u1h2Context = newRequestContext(user1, host2);
+
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 should not have READ access to any topic");
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 should not have READ access to any consumer 
group");
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TRANSACTIONAL_ID),
+                "User1 from host1 should not have READ access to any 
transactional id");
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
CLUSTER),
+                "User1 from host1 should not have READ access to the cluster");
+        assertTrue(authorizeByResourceType(authorizer, u1h2Context, READ, 
TOPIC),
+                "User1 from host2 should have READ access to at least one 
topic");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeDenyTakesPrecedence() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        AccessControlEntry acl1 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, ALLOW);
+        AccessControlEntry acl2 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, DENY);
+
+        addAcls(authorizer, Set.of(acl1), resource1);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
TOPIC),
+                "User1 from host1 should have WRITE access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(acl2), resource1);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
TOPIC),
+                "User1 from host1 should not have WRITE access to any topic");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypePrefixedResourceDenyDominate() 
throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern a = new ResourcePattern(GROUP, "a", PREFIXED);
+        ResourcePattern ab = new ResourcePattern(GROUP, "ab", PREFIXED);
+        ResourcePattern abc = new ResourcePattern(GROUP, "abc", PREFIXED);
+        ResourcePattern abcd = new ResourcePattern(GROUP, "abcd", PREFIXED);
+        ResourcePattern abcde = new ResourcePattern(GROUP, "abcde", PREFIXED);
+
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        AccessControlEntry allowAce = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, ALLOW);
+        AccessControlEntry denyAce = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, DENY);
+
+        addAcls(authorizer, Set.of(allowAce), abcde);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 should have READ access to at least one 
group");
+
+        addAcls(authorizer, Set.of(denyAce), abcd);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 now should not have READ access to any 
group");
+
+        addAcls(authorizer, Set.of(allowAce), abc);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 now should have READ access to any group");
+
+        addAcls(authorizer, Set.of(denyAce), a);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 now should not have READ access to any 
group");
+
+        addAcls(authorizer, Set.of(allowAce), ab);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 still should not have READ access to any 
group");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeWildcardResourceDenyDominate() 
throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern wildcard = new ResourcePattern(GROUP, 
WILDCARD_RESOURCE, LITERAL);
+        ResourcePattern prefixed = new ResourcePattern(GROUP, "hello", 
PREFIXED);
+        ResourcePattern literal = new ResourcePattern(GROUP, "aloha", LITERAL);
+
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        AccessControlEntry allowAce = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, ALLOW);
+        AccessControlEntry denyAce = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, DENY);
+
+        addAcls(authorizer, Set.of(allowAce), prefixed);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
GROUP),
+                "User1 from host1 should have WRITE access to at least one 
group");
+
+        addAcls(authorizer, Set.of(denyAce), wildcard);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
GROUP),
+                "User1 from host1 now should not have WRITE access to any 
group");
+
+        addAcls(authorizer, Set.of(allowAce), wildcard);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
GROUP),
+                "User1 from host1 still should not have WRITE access to any 
group");
+
+        addAcls(authorizer, Set.of(allowAce), literal);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
GROUP),
+                "User1 from host1 still should not have WRITE access to any 
group");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeWithAllOperationAce() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        AccessControlEntry denyAll = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), ALL, DENY);
+        AccessControlEntry allowAll = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), ALL, ALLOW);
+        AccessControlEntry denyWrite = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), WRITE, DENY);
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 should not have READ access to any topic 
when no ACL exists");
+
+        addAcls(authorizer, Set.of(denyWrite, allowAll), resource1);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 now should have READ access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(denyAll), resource1);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 now should not have READ access to any 
topic");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeWithAllHostAce() throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        InetAddress host2 = InetAddress.getByName("192.168.1.2");
+        String allHost = WILDCARD_HOST;
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        ResourcePattern resource2 = new ResourcePattern(TOPIC, "sb2" + 
UUID.randomUUID(), LITERAL);
+        AccessControlEntry allowHost1 = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+        AccessControlEntry denyHost1 = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, DENY);
+        AccessControlEntry denyAllHost = new 
AccessControlEntry(user1.toString(), allHost, READ, DENY);
+        AccessControlEntry allowAllHost = new 
AccessControlEntry(user1.toString(), allHost, READ, ALLOW);
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        RequestContext u1h2Context = newRequestContext(user1, host2);
+
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 should not have READ access to any topic 
when no ACL exists");
+
+        addAcls(authorizer, Set.of(allowHost1), resource1);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 should now have READ access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(denyAllHost), resource1);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 now shouldn't have READ access to any 
topic");
+
+        addAcls(authorizer, Set.of(denyHost1), resource2);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 still should not have READ access to any 
topic");
+        assertFalse(authorizeByResourceType(authorizer, u1h2Context, READ, 
TOPIC),
+                "User1 from host2 should not have READ access to any topic");
+
+        addAcls(authorizer, Set.of(allowAllHost), resource2);
+        assertTrue(authorizeByResourceType(authorizer, u1h2Context, READ, 
TOPIC),
+                "User1 from host2 should now have READ access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(denyAllHost), resource2);
+        assertFalse(authorizeByResourceType(authorizer, u1h2Context, READ, 
TOPIC),
+                "User1 from host2 now shouldn't have READ access to any 
topic");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeWithAllPrincipalAce() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user2");
+        String allUser = WILDCARD_PRINCIPAL_STRING;
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        ResourcePattern resource2 = new ResourcePattern(TOPIC, "sb2" + 
UUID.randomUUID(), LITERAL);
+        AccessControlEntry allowUser1 = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+        AccessControlEntry denyUser1 = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, DENY);
+        AccessControlEntry denyAllUser = new AccessControlEntry(allUser, 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry allowAllUser = new AccessControlEntry(allUser, 
host1.getHostAddress(), READ, ALLOW);
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        RequestContext u2h1Context = newRequestContext(user2, host1);
+
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 should not have READ access to any topic 
when no ACL exists");
+
+        addAcls(authorizer, Set.of(allowUser1), resource1);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 should now have READ access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(denyAllUser), resource1);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 now shouldn't have READ access to any 
topic");
+
+        addAcls(authorizer, Set.of(denyUser1), resource2);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TOPIC),
+                "User1 from host1 still should not have READ access to any 
topic");
+        assertFalse(authorizeByResourceType(authorizer, u2h1Context, READ, 
TOPIC),
+                "User2 from host1 should not have READ access to any topic");
+
+        addAcls(authorizer, Set.of(allowAllUser), resource2);
+        assertTrue(authorizeByResourceType(authorizer, u2h1Context, READ, 
TOPIC),
+                "User2 from host1 should now have READ access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(denyAllUser), resource2);
+        assertFalse(authorizeByResourceType(authorizer, u2h1Context, READ, 
TOPIC),
+                "User2 from host1 now shouldn't have READ access to any 
topic");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeSuperUserHasAccess() throws 
Exception {
+        AccessControlEntry denyAllAce = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, 
DENY);
+        String superUserName = "superuser1";
+        KafkaPrincipal superUser1 = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, superUserName);
+        InetAddress host1 = InetAddress.getByName("192.0.4.4");
+        ResourcePattern allTopicsResource = new ResourcePattern(TOPIC, 
WILDCARD_RESOURCE, LITERAL);
+        ResourcePattern clusterResource = new ResourcePattern(CLUSTER, 
WILDCARD_RESOURCE, LITERAL);
+        ResourcePattern groupResource = new ResourcePattern(GROUP, 
WILDCARD_RESOURCE, LITERAL);
+        ResourcePattern transactionIdResource = new 
ResourcePattern(TRANSACTIONAL_ID, WILDCARD_RESOURCE, LITERAL);
+
+        addAcls(authorizer, Set.of(denyAllAce), allTopicsResource);
+        addAcls(authorizer, Set.of(denyAllAce), clusterResource);
+        addAcls(authorizer, Set.of(denyAllAce), groupResource);
+        addAcls(authorizer, Set.of(denyAllAce), transactionIdResource);
+
+        RequestContext superUserContext = newRequestContext(superUser1, host1);
+
+        assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
TOPIC),
+                "superuser always has access, no matter what acls.");
+        assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
CLUSTER),
+                "superuser always has access, no matter what acls.");
+        assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
GROUP),
+                "superuser always has access, no matter what acls.");
+        assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
TRANSACTIONAL_ID),
+                "superuser always has access, no matter what acls.");
+    }
+
+    @Test
+    public void testAuthorizeThrowsOnNonLiteralResource() {
+        assertThrows(IllegalArgumentException.class, () -> 
authorize(authorizer, requestContext, READ,
+                new ResourcePattern(TOPIC, "something", PREFIXED)));
+    }
+
+    @Test
+    public void testAuthorizeWithEmptyResourceName() throws Exception {
+        assertFalse(authorize(authorizer, requestContext, READ, new 
ResourcePattern(GROUP, "", LITERAL)));
+        addAcls(authorizer, Set.of(allowReadAcl), new ResourcePattern(GROUP, 
WILDCARD_RESOURCE, LITERAL));
+        assertTrue(authorize(authorizer, requestContext, READ, new 
ResourcePattern(GROUP, "", LITERAL)));
+    }
+
+    // Authorizing the empty resource is not supported because empty resource 
name is invalid.
+    @Test
+    public void testEmptyAclThrowsException() {
+        assertThrows(ApiException.class,
+                () -> addAcls(authorizer, Set.of(allowReadAcl), new 
ResourcePattern(GROUP, "", LITERAL)));
+    }
+
+    @Test
+    public void testTopicAcl() throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"rob");
+        KafkaPrincipal user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"batman");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        InetAddress host2 = InetAddress.getByName("192.168.1.2");
+
+        //user1 has READ access from host1 and host2.
+        AccessControlEntry acl1 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, ALLOW);
+        AccessControlEntry acl2 = new AccessControlEntry(user1.toString(), 
host2.getHostAddress(), READ, ALLOW);
+
+        //user1 does not have  READ access from host1.
+        AccessControlEntry acl3 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, DENY);
+
+        //user1 has WRITE access from host1 only.
+        AccessControlEntry acl4 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, ALLOW);
+
+        //user1 has DESCRIBE access from all hosts.
+        AccessControlEntry acl5 = new AccessControlEntry(user1.toString(), 
WILDCARD_HOST, DESCRIBE, ALLOW);
+
+        //user2 has READ access from all hosts.
+        AccessControlEntry acl6 = new AccessControlEntry(user2.toString(), 
WILDCARD_HOST, READ, ALLOW);
+
+        //user3 has WRITE access from all hosts.
+        AccessControlEntry acl7 = new AccessControlEntry(user3.toString(), 
WILDCARD_HOST, WRITE, ALLOW);
+
+        Set<AccessControlEntry> acls = Set.of(acl1, acl2, acl3, acl4, acl5, 
acl6, acl7);
+
+        changeAclAndVerify(Set.of(), acls, Set.of());
+
+        RequestContext host1Context = newRequestContext(user1, host1);
+        RequestContext host2Context = newRequestContext(user1, host2);
+
+        assertTrue(authorize(authorizer, host2Context, READ, resource), "User1 
should have READ access from host2");
+        assertFalse(authorize(authorizer, host1Context, READ, resource), 
"User1 should not have READ access from host1 due to denyAcl");
+        assertTrue(authorize(authorizer, host1Context, WRITE, resource), 
"User1 should have WRITE access from host1");
+        assertFalse(authorize(authorizer, host2Context, WRITE, resource), 
"User1 should not have WRITE access from host2 as no allow acl is defined");
+        assertTrue(authorize(authorizer, host1Context, DESCRIBE, resource), 
"User1 should have DESCRIBE access from host1");
+        assertTrue(authorize(authorizer, host2Context, DESCRIBE, resource), 
"User1 should have DESCRIBE access from host2");
+        assertFalse(authorize(authorizer, host1Context, ALTER, resource), 
"User1 should not have edit access from host1");
+        assertFalse(authorize(authorizer, host2Context, ALTER, resource), 
"User1 should not have edit access from host2");
+
+        //test if user has READ or WRITE access they also get DESCRIBE access
+        RequestContext user2Context = newRequestContext(user2, host1);
+        RequestContext user3Context = newRequestContext(user3, host1);
+        assertTrue(authorize(authorizer, user2Context, DESCRIBE, resource), 
"User2 should have DESCRIBE access from host1");
+        assertTrue(authorize(authorizer, user3Context, DESCRIBE, resource), 
"User3 should have DESCRIBE access from host1");
+        assertTrue(authorize(authorizer, user2Context, READ, resource), "User2 
should have READ access from host1");
+        assertTrue(authorize(authorizer, user3Context, WRITE, resource), 
"User3 should have WRITE access from host1");
+    }
+
+    /**
+     * CustomPrincipals should be compared with their principal type and name
+     */
+    @Test
+    public void testAllowAccessWithCustomPrincipal() throws Exception {
+        KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        CustomPrincipal customUserPrincipal = new 
CustomPrincipal(KafkaPrincipal.USER_TYPE, username);
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        InetAddress host2 = InetAddress.getByName("192.168.1.2");
+
+        // user has READ access from host2 but not from host1
+        AccessControlEntry acl1 = new AccessControlEntry(user.toString(), 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry acl2 = new AccessControlEntry(user.toString(), 
host2.getHostAddress(), READ, ALLOW);
+        Set<AccessControlEntry> acls = Set.of(acl1, acl2);
+        changeAclAndVerify(Set.of(), acls, Set.of());
+
+        RequestContext host1Context = newRequestContext(customUserPrincipal, 
host1);
+        RequestContext host2Context = newRequestContext(customUserPrincipal, 
host2);
+
+        assertTrue(authorize(authorizer, host2Context, READ, resource), "User1 
should have READ access from host2");
+        assertFalse(authorize(authorizer, host1Context, READ, resource), 
"User1 should not have READ access from host1 due to denyAcl");
+    }
+
+    @Test
+    public void testDenyTakesPrecedence() throws Exception {
+        KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        InetAddress host = InetAddress.getByName("192.168.2.1");
+        RequestContext session = newRequestContext(user, host);
+
+        AccessControlEntry allowAll = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, 
ALLOW);
+        AccessControlEntry denyAcl = new AccessControlEntry(user.toString(), 
host.getHostAddress(), AclOperation.ALL, DENY);
+        Set<AccessControlEntry> acls = Set.of(allowAll, denyAcl);
+
+        changeAclAndVerify(Set.of(), acls, Set.of());
+
+        assertFalse(authorize(authorizer, session, READ, resource), "deny 
should take precedence over allow.");
+    }
+
+    @Test
+    public void testAllowAllAccess() throws Exception {
+        AccessControlEntry allowAllAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, 
ALLOW);
+
+        changeAclAndVerify(Set.of(), Set.of(allowAllAcl), Set.of());
+
+        RequestContext context = newRequestContext(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), 
InetAddress.getByName("192.0.4.4"));
+        assertTrue(authorize(authorizer, context, READ, resource), "allow all 
acl should allow access to all.");
+    }
+
+    @Test
+    public void testSuperUserHasAccess() throws Exception {
+        AccessControlEntry denyAllAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, 
DENY);
+
+        changeAclAndVerify(Set.of(), Set.of(denyAllAcl), Set.of());
+
+        RequestContext session1 = newRequestContext(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), 
InetAddress.getByName("192.0.4.4"));
+        RequestContext session2 = newRequestContext(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), 
InetAddress.getByName("192.0.4.4"));
+
+        assertTrue(authorize(authorizer, session1, READ, resource), "superuser 
always has access, no matter what acls.");
+        assertTrue(authorize(authorizer, session2, READ, resource), "superuser 
always has access, no matter what acls.");
+    }
+
+    /**
+     * CustomPrincipals should be compared with their principal type and name
+     */
+    @Test
+    public void testSuperUserWithCustomPrincipalHasAccess() throws Exception {
+        AccessControlEntry denyAllAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, 
DENY);
+        changeAclAndVerify(Set.of(), Set.of(denyAllAcl), Set.of());
+
+        RequestContext session = newRequestContext(new 
CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), 
InetAddress.getByName("192.0.4.4"));
+
+        assertTrue(authorize(authorizer, session, READ, resource), "superuser 
with custom principal always has access, no matter what acls.");
+    }
+
+    @Test
+    public void testWildCardAcls() throws Exception {
+        assertFalse(authorize(authorizer, requestContext, READ, resource), 
"when acls = [], authorizer should fail close.");
+
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        InetAddress host1 = InetAddress.getByName("192.168.3.1");
+        AccessControlEntry readAcl = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, ALLOW);
+
+        Set<AccessControlEntry> acls = changeAclAndVerify(Set.of(), 
Set.of(readAcl), Set.of(), wildCardResource);
+
+        RequestContext host1Context = newRequestContext(user1, host1);
+        assertTrue(authorize(authorizer, host1Context, READ, resource), "User1 
should have READ access from host1");
+
+        //allow WRITE to specific topic.
+        AccessControlEntry writeAcl = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, ALLOW);
+        changeAclAndVerify(Set.of(), Set.of(writeAcl), Set.of());
+
+        //deny WRITE to wild card topic.
+        AccessControlEntry denyWriteOnWildCardResourceAcl = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), WRITE, DENY);
+        changeAclAndVerify(acls, Set.of(denyWriteOnWildCardResourceAcl), 
Set.of(), wildCardResource);
+
+        assertFalse(authorize(authorizer, host1Context, WRITE, resource), 
"User1 should not have WRITE access from host1");
+    }
+
+    @Test
+    public void testNoAclFound() {
+        assertFalse(authorize(authorizer, requestContext, READ, resource), 
"when acls = [], authorizer should deny op.");
+    }
+
+    @Test
+    public void testNoAclFoundOverride() throws IOException {
+        Map<String, Object> cfg = configs();
+        cfg.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, 
"true");
+
+        try (Authorizer testAuthorizer = createAuthorizer(cfg)) {
+            assertTrue(authorize(testAuthorizer, requestContext, READ, 
resource),
+                    "when acls = null or [],  authorizer should allow op with 
allow.everyone = true.");
+        }
+    }
+
+    @Test
+    public void testAclConfigWithWhitespace() throws IOException {
+        Map<String, Object> cfg = configs();
+        cfg.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, " 
true");
+        // replace all property values with leading & trailing whitespaces
+        cfg.replaceAll((k, v) -> " " + v + " ");
+
+        try (Authorizer testAuthorizer = createAuthorizer(cfg)) {
+            assertTrue(authorize(testAuthorizer, requestContext, READ, 
resource),
+                    "when acls = null or [],  authorizer should allow op with 
allow.everyone = true.");
+        }
+    }
+
+    @Test
+    public void testAclManagementAPIs() throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"bob");
+        String host1 = "host1";
+        String host2 = "host2";
+
+        AccessControlEntry acl1 = new AccessControlEntry(user1.toString(), 
host1, READ, ALLOW);
+        AccessControlEntry acl2 = new AccessControlEntry(user1.toString(), 
host1, WRITE, ALLOW);
+        AccessControlEntry acl3 = new AccessControlEntry(user2.toString(), 
host2, READ, ALLOW);
+        AccessControlEntry acl4 = new AccessControlEntry(user2.toString(), 
host2, WRITE, ALLOW);
+
+        Set<AccessControlEntry> acls = changeAclAndVerify(Set.of(), 
Set.of(acl1, acl2, acl3, acl4), Set.of());
+
+        //test addAcl is additive
+        AccessControlEntry acl5 = new AccessControlEntry(user2.toString(), 
WILDCARD_HOST, READ, ALLOW);
+        acls = changeAclAndVerify(acls, Set.of(acl5), Set.of());
+
+        //test get by principal name.
+        TestUtils.waitForCondition(() -> Set.of(acl1, acl2).stream().map(acl 
-> new AclBinding(resource, 
acl)).collect(Collectors.toSet()).equals(getAcls(authorizer, user1)),
+                "changes not propagated in timeout period");
+        TestUtils.waitForCondition(() -> Set.of(acl3, acl4, 
acl5).stream().map(acl -> new AclBinding(resource, 
acl)).collect(Collectors.toSet()).equals(getAcls(authorizer, user2)),
+                "changes not propagated in timeout period");
+
+        Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = Map.of(
+                new ResourcePattern(TOPIC, WILDCARD_RESOURCE, LITERAL), 
Set.of(new AccessControlEntry(user2.toString(), WILDCARD_HOST, READ, ALLOW)),
+                new ResourcePattern(CLUSTER, WILDCARD_RESOURCE, LITERAL), 
Set.of(new AccessControlEntry(user2.toString(), host1, READ, ALLOW)),
+                new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL), acls,
+                new ResourcePattern(GROUP, "test-ConsumerGroup", LITERAL), acls
+        );
+
+        for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry : 
resourceToAcls.entrySet()) {
+            ResourcePattern key = entry.getKey();
+            Set<AccessControlEntry> value = entry.getValue();
+            changeAclAndVerify(Set.of(), value, Set.of(), key);
+        }
+
+        Set<AclBinding> expectedAcls = new HashSet<>();
+        resourceToAcls.forEach((res, aces) ->
+            aces.forEach(ace -> expectedAcls.add(new AclBinding(res, ace)))
+        );
+        acls.forEach(acl -> expectedAcls.add(new AclBinding(resource, acl)));
+        TestUtils.waitForCondition(() -> 
expectedAcls.equals(getAcls(authorizer)), "changes not propagated in timeout 
period.");
+
+        //test remove acl from existing acls.
+        changeAclAndVerify(acls, Set.of(), Set.of(acl1, acl5));
+
+        //test remove all acls for resource
+        removeAcls(authorizer, Set.of(), resource);
+        ServerTestUtils.waitAndVerifyAcls(Set.of(), authorizer, resource, 
AccessControlEntryFilter.ANY);
+
+        acls = changeAclAndVerify(Set.of(), Set.of(acl1), Set.of());
+        changeAclAndVerify(acls, Set.of(), acls);
+    }
+
+    @Test
+    public void testLocalConcurrentModificationOfResourceAcls() throws 
Exception {
+        ResourcePattern commonResource = new ResourcePattern(TOPIC, "test", 
LITERAL);
+
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        AccessControlEntry acl1 = new AccessControlEntry(user1.toString(), 
WILDCARD_HOST, READ, ALLOW);
+
+        KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"bob");
+        AccessControlEntry acl2 = new AccessControlEntry(user2.toString(), 
WILDCARD_HOST, READ, DENY);
+
+        addAcls(authorizer, Set.of(acl1), commonResource);
+        addAcls(authorizer, Set.of(acl2), commonResource);
+
+        ServerTestUtils.waitAndVerifyAcls(Set.of(acl1, acl2), authorizer, 
commonResource, AccessControlEntryFilter.ANY);
+    }
+
+    /**
+     * Test ACL inheritance, as described in {@link 
org.apache.kafka.common.acl.AclOperation}
+     */
+    @Test
+    public void testAclInheritance() throws Exception {
+        testImplicationsOfAllow(AclOperation.ALL, Set.of(READ, WRITE, CREATE, 
DELETE, ALTER, DESCRIBE,
+                CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, 
IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT));
+        testImplicationsOfDeny(AclOperation.ALL, Set.of(READ, WRITE, CREATE, 
DELETE, ALTER, DESCRIBE,
+                CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, 
IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT));
+        testImplicationsOfAllow(READ, Set.of(DESCRIBE));
+        testImplicationsOfAllow(WRITE, Set.of(DESCRIBE));
+        testImplicationsOfAllow(DELETE, Set.of(DESCRIBE));
+        testImplicationsOfAllow(ALTER, Set.of(DESCRIBE));
+        testImplicationsOfDeny(DESCRIBE, Set.of());
+        testImplicationsOfAllow(ALTER_CONFIGS, Set.of(DESCRIBE_CONFIGS));
+        testImplicationsOfDeny(DESCRIBE_CONFIGS, Set.of());
+    }
+
+    private void testImplicationsOfAllow(AclOperation parentOp, 
Set<AclOperation> allowedOps) throws Exception {
+        KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        InetAddress host = InetAddress.getByName("192.168.3.1");
+        RequestContext hostContext = newRequestContext(user, host);
+        AccessControlEntry acl = new AccessControlEntry(user.toString(), 
WILDCARD_HOST, parentOp, ALLOW);
+        addAcls(authorizer, Set.of(acl), clusterResource);
+        for (AclOperation op : AclOperation.values()) {
+            if (invalidOp(op)) continue;
+            boolean authorized = authorize(authorizer, hostContext, op, 
clusterResource);
+            if (allowedOps.contains(op) || op == parentOp) {
+                assertTrue(authorized, "ALLOW " + parentOp + " should imply 
ALLOW " + op);
+            } else {
+                assertFalse(authorized, "ALLOW " + parentOp + " should not 
imply ALLOW " + op);
+            }
+        }
+        removeAcls(authorizer, Set.of(acl), clusterResource);
+    }
+
+    private void testImplicationsOfDeny(AclOperation parentOp, 
Set<AclOperation> deniedOps) throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        InetAddress host1 = InetAddress.getByName("192.168.3.1");
+        RequestContext host1Context = newRequestContext(user1, host1);
+        Set<AccessControlEntry> acls = Set.of(new 
AccessControlEntry(user1.toString(), WILDCARD_HOST, parentOp, DENY),
+                new AccessControlEntry(user1.toString(), WILDCARD_HOST, 
AclOperation.ALL, ALLOW));
+        addAcls(authorizer, acls, clusterResource);
+        for (AclOperation op : AclOperation.values()) {
+            if (invalidOp(op)) continue;
+            boolean authorized = authorize(authorizer, host1Context, op, 
clusterResource);
+            if (deniedOps.contains(op) || op == parentOp) {
+                assertFalse(authorized, "DENY " + parentOp + " should imply 
DENY " + op);
+            } else {
+                assertTrue(authorized, "DENY " + parentOp + " should not imply 
DENY " + op);
+            }
+        }
+        removeAcls(authorizer, acls, clusterResource);
+    }
+
+    @Test
+    public void testAccessAllowedIfAllowAclExistsOnWildcardResource() throws 
Exception {
+        addAcls(authorizer, Set.of(allowReadAcl), wildCardResource);
+
+        assertTrue(authorize(authorizer, requestContext, READ, resource));
+    }
+
+    @Test
+    public void testDeleteAclOnWildcardResource() throws Exception {
+        addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl), 
wildCardResource);
+
+        removeAcls(authorizer, Set.of(allowReadAcl), wildCardResource);
+
+        assertEquals(Set.of(allowWriteAcl), getAcls(authorizer, 
wildCardResource));
+    }
+
+    @Test
+    public void testDeleteAllAclOnWildcardResource() throws Exception {
+        addAcls(authorizer, Set.of(allowReadAcl), wildCardResource);
+
+        removeAcls(authorizer, Set.of(), wildCardResource);
+
+        assertEquals(Set.of(), getAcls(authorizer));
+    }
+
+    @Test
+    public void testAccessAllowedIfAllowAclExistsOnPrefixedResource() throws 
Exception {
+        addAcls(authorizer, Set.of(allowReadAcl), prefixedResource);
+
+        assertTrue(authorize(authorizer, requestContext, READ, resource));
+    }
+
+    @Test
+    public void testDeleteAclOnPrefixedResource() throws Exception {
+        addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl), 
prefixedResource);
+
+        removeAcls(authorizer, Set.of(allowReadAcl), prefixedResource);
+
+        assertEquals(Set.of(allowWriteAcl), getAcls(authorizer, 
prefixedResource));
+    }
+
+    @Test
+    public void testDeleteAllAclOnPrefixedResource() throws Exception {
+        addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl), 
prefixedResource);
+
+        removeAcls(authorizer, Set.of(), prefixedResource);
+
+        assertEquals(Set.of(), getAcls(authorizer));
+    }
+
+    @Test
+    public void testAddAclsOnLiteralResource() throws Exception {
+        addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl), resource);
+        addAcls(authorizer, Set.of(allowWriteAcl, denyReadAcl), resource);
+
+        assertEquals(Set.of(allowReadAcl, allowWriteAcl, denyReadAcl), 
getAcls(authorizer, resource));
+        assertEquals(Set.of(), getAcls(authorizer, wildCardResource));
+        assertEquals(Set.of(), getAcls(authorizer, prefixedResource));
+    }
+
+    @Test
+    public void testAddAclsOnWildcardResource() throws Exception {
+        addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl), 
wildCardResource);
+        addAcls(authorizer, Set.of(allowWriteAcl, denyReadAcl), 
wildCardResource);
+
+        assertEquals(Set.of(allowReadAcl, allowWriteAcl, denyReadAcl), 
getAcls(authorizer, wildCardResource));
+        assertEquals(Set.of(), getAcls(authorizer, resource));
+        assertEquals(Set.of(), getAcls(authorizer, prefixedResource));
+    }
+
+    @Test
+    public void testAddAclsOnPrefixedResource() throws Exception {
+        addAcls(authorizer, Set.of(allowReadAcl, allowWriteAcl), 
prefixedResource);
+        addAcls(authorizer, Set.of(allowWriteAcl, denyReadAcl), 
prefixedResource);
+
+        assertEquals(Set.of(allowReadAcl, allowWriteAcl, denyReadAcl), 
getAcls(authorizer, prefixedResource));
+        assertEquals(Set.of(), getAcls(authorizer, wildCardResource));
+        assertEquals(Set.of(), getAcls(authorizer, resource));
+    }
+
+    @Test
+    public void testAuthorizeWithPrefixedResource() throws Exception {
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"a_other", LITERAL));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"a_other", PREFIXED));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"foo-" + UUID.randomUUID(), PREFIXED));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"foo-" + UUID.randomUUID(), PREFIXED));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"foo-" + UUID.randomUUID() + "-zzz", PREFIXED));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"fooo-" + UUID.randomUUID(), PREFIXED));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"fo-" + UUID.randomUUID(), PREFIXED));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"fop-" + UUID.randomUUID(), PREFIXED));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"fon-" + UUID.randomUUID(), PREFIXED));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"fon-", PREFIXED));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"z_other", PREFIXED));
+        addAcls(authorizer, Set.of(denyReadAcl), new ResourcePattern(TOPIC, 
"z_other", LITERAL));
+
+        addAcls(authorizer, Set.of(allowReadAcl), prefixedResource);
+
+        assertTrue(authorize(authorizer, requestContext, READ, resource));
+    }
+
+    @Test
+    public void testSingleCharacterResourceAcls() throws Exception {
+        addAcls(authorizer, Set.of(allowReadAcl), new ResourcePattern(TOPIC, 
"f", LITERAL));
+        assertTrue(authorize(authorizer, requestContext, READ, new 
ResourcePattern(TOPIC, "f", LITERAL)));
+        assertFalse(authorize(authorizer, requestContext, READ, new 
ResourcePattern(TOPIC, "foo", LITERAL)));
+
+        addAcls(authorizer, Set.of(allowReadAcl), new ResourcePattern(TOPIC, 
"_", PREFIXED));
+        assertTrue(authorize(authorizer, requestContext, READ, new 
ResourcePattern(TOPIC, "_foo", LITERAL)));
+        assertTrue(authorize(authorizer, requestContext, READ, new 
ResourcePattern(TOPIC, "_", LITERAL)));
+        assertFalse(authorize(authorizer, requestContext, READ, new 
ResourcePattern(TOPIC, "foo_", LITERAL)));
+    }
+
+    @Test
+    public void testGetAclsPrincipal() throws Exception {
+        AccessControlEntry aclOnSpecificPrincipal = new 
AccessControlEntry(principal.toString(), WILDCARD_HOST, WRITE, ALLOW);
+        addAcls(authorizer, Set.of(aclOnSpecificPrincipal), resource);
+
+        assertEquals(0,
+                getAcls(authorizer, wildcardPrincipal).size(), "acl on 
specific should not be returned for wildcard request");
+        assertEquals(1,
+                getAcls(authorizer, principal).size(), "acl on specific should 
be returned for specific request");
+        assertEquals(1,
+                getAcls(authorizer, new 
KafkaPrincipal(principal.getPrincipalType(), principal.getName())).size(), "acl 
on specific should be returned for different principal instance");
+
+        removeAcls(authorizer, Set.of(), resource);
+        AccessControlEntry aclOnWildcardPrincipal = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW);
+        addAcls(authorizer, Set.of(aclOnWildcardPrincipal), resource);
+
+        assertEquals(1, getAcls(authorizer, wildcardPrincipal).size(), "acl on 
wildcard should be returned for wildcard request");
+        assertEquals(0, getAcls(authorizer, principal).size(), "acl on 
wildcard should not be returned for specific request");
+    }
+
+    @Test
+    public void testAclsFilter() throws Exception {
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "foo-" + 
UUID.randomUUID(), LITERAL);
+        ResourcePattern resource2 = new ResourcePattern(TOPIC, "bar-" + 
UUID.randomUUID(), LITERAL);
+        ResourcePattern prefixedResource = new ResourcePattern(TOPIC, "bar-", 
PREFIXED);
+
+        AclBinding acl1 = new AclBinding(resource1, new 
AccessControlEntry(principal.toString(), WILDCARD_HOST, READ, ALLOW));
+        AclBinding acl2 = new AclBinding(resource1, new 
AccessControlEntry(principal.toString(), "192.168.0.1", WRITE, ALLOW));
+        AclBinding acl3 = new AclBinding(resource2, new 
AccessControlEntry(principal.toString(), WILDCARD_HOST, DESCRIBE, ALLOW));
+        AclBinding acl4 = new AclBinding(prefixedResource, new 
AccessControlEntry(wildcardPrincipal.toString(), WILDCARD_HOST, READ, ALLOW));
+
+        authorizer.createAcls(requestContext, List.of(acl1, acl2, acl3, acl4));
+        assertEquals(Set.of(acl1, acl2, acl3, acl4), 
toSet(authorizer.acls(AclBindingFilter.ANY)));
+        assertEquals(Set.of(acl1, acl2), toSet(authorizer.acls(new 
AclBindingFilter(resource1.toFilter(), AccessControlEntryFilter.ANY))));
+        assertEquals(Set.of(acl4), toSet(authorizer.acls(new 
AclBindingFilter(prefixedResource.toFilter(), AccessControlEntryFilter.ANY))));
+        AclBindingFilter matchingFilter = new AclBindingFilter(new 
ResourcePatternFilter(ResourceType.ANY, resource2.name(), MATCH), 
AccessControlEntryFilter.ANY);
+        assertEquals(Set.of(acl3, acl4), 
toSet(authorizer.acls(matchingFilter)));
+
+        List<AclBindingFilter> filters = List.of(matchingFilter,
+                acl1.toFilter(),
+                new AclBindingFilter(resource2.toFilter(), 
AccessControlEntryFilter.ANY),
+                new AclBindingFilter(new ResourcePatternFilter(TOPIC, "baz", 
PatternType.ANY), AccessControlEntryFilter.ANY));
+        List<AclDeleteResult> deleteResults = new ArrayList<>();
+        for (CompletionStage<AclDeleteResult> stage : 
authorizer.deleteAcls(requestContext, filters)) {
+            deleteResults.add(stage.toCompletableFuture().get());
+        }
+        assertTrue(deleteResults.stream().noneMatch(r -> 
r.exception().isPresent()));
+        for (int i = 0; i < filters.size(); i++) {
+            
assertTrue(deleteResults.get(i).aclBindingDeleteResults().stream().noneMatch(r 
-> r.exception().isPresent()));
+        }
+        assertEquals(Set.of(acl3, acl4), 
deleteResults.get(0).aclBindingDeleteResults().stream().map(AclDeleteResult.AclBindingDeleteResult::aclBinding).collect(Collectors.toSet()));
+        assertEquals(Set.of(acl1), 
deleteResults.get(1).aclBindingDeleteResults().stream().map(AclDeleteResult.AclBindingDeleteResult::aclBinding).collect(Collectors.toSet()));
+        // standard authorizer first finds the acls that match filters and 
then delete them.
+        // So filters[2] will match acl3 even though it is also matching 
filters[0] and will be deleted by it
+        assertEquals(Set.of(acl3), 
deleteResults.get(2).aclBindingDeleteResults().stream().map(AclDeleteResult.AclBindingDeleteResult::aclBinding).collect(Collectors.toSet()));
+        assertEquals(Set.of(), 
deleteResults.get(3).aclBindingDeleteResults().stream().map(AclDeleteResult.AclBindingDeleteResult::aclBinding).collect(Collectors.toSet()));
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeNoAclFoundOverride() throws 
IOException {
+        Map<String, Object> cfg = configs();
+        cfg.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, 
"true");
+
+        try (Authorizer authorizer = createAuthorizer(cfg)) {
+            assertTrue(authorizeByResourceType(authorizer, requestContext, 
READ, resource.resourceType()),
+                    "If allow.everyone.if.no.acl.found = true, caller should 
have read access to at least one topic");
+            assertTrue(authorizeByResourceType(authorizer, requestContext, 
WRITE, resource.resourceType()),
+                    "If allow.everyone.if.no.acl.found = true, caller should 
have write access to at least one topic");
+        }
+    }
+
+    private <T> Set<T> toSet(Iterable<T> iterable) {
+        return StreamSupport.stream(iterable.spliterator(), 
false).collect(Collectors.toSet());
+    }
+
+    private Set<AccessControlEntry> changeAclAndVerify(Set<AccessControlEntry> 
originalAcls,
+                                                       Set<AccessControlEntry> 
addedAcls,
+                                                       Set<AccessControlEntry> 
removedAcls) throws Exception {
+        return changeAclAndVerify(originalAcls, addedAcls, removedAcls, 
resource);
+    }
+
+    private Set<AccessControlEntry> changeAclAndVerify(Set<AccessControlEntry> 
originalAcls,
+                                                       Set<AccessControlEntry> 
addedAcls,
+                                                       Set<AccessControlEntry> 
removedAcls,
+                                                       ResourcePattern 
resource) throws Exception {
+        Set<AccessControlEntry> acls = new HashSet<>(originalAcls);
+
+        if (!addedAcls.isEmpty()) {
+            addAcls(authorizer, addedAcls, resource);
+            acls.addAll(addedAcls);
+        }
+
+        if (!removedAcls.isEmpty()) {
+            removeAcls(authorizer, removedAcls, resource);
+            acls.removeAll(removedAcls);
+        }
+
+        ServerTestUtils.waitAndVerifyAcls(acls, authorizer, resource, 
AccessControlEntryFilter.ANY);
+
+        return acls;
+    }
+
+    private boolean authorize(Authorizer authorizer, RequestContext 
requestContext, AclOperation operation, ResourcePattern resource) {
+        Action action = new Action(operation, resource, 1, true, true);
+        return authorizer.authorize(requestContext, List.of(action)).get(0) == 
AuthorizationResult.ALLOWED;
+    }
+
+    private Set<AccessControlEntry> getAcls(Authorizer authorizer, 
ResourcePattern resourcePattern) {
+        return toSet(authorizer.acls(new 
AclBindingFilter(resourcePattern.toFilter(), 
AccessControlEntryFilter.ANY))).stream()
+            .map(AclBinding::entry).collect(Collectors.toSet());
+    }
+
+    private Set<AclBinding> getAcls(Authorizer authorizer, KafkaPrincipal 
principal) {
+        AclBindingFilter filter = new 
AclBindingFilter(ResourcePatternFilter.ANY,
+                new AccessControlEntryFilter(principal.toString(), null, 
AclOperation.ANY, AclPermissionType.ANY));
+        return toSet(authorizer.acls(filter));
+    }
+
+    private Set<AclBinding> getAcls(Authorizer authorizer) {
+        return toSet(authorizer.acls(AclBindingFilter.ANY));
+    }
+
+    private boolean invalidOp(AclOperation op) {
+        return op == AclOperation.ANY || op == AclOperation.UNKNOWN;
+    }
+
+    private Authorizer createAuthorizer(Map<String, ?> configs) {
+        Metrics metrics = new Metrics();
+        metricsInstances.add(metrics);
+        PluginMetricsImpl pluginMetrics = new PluginMetricsImpl(metrics, 
Map.of());
+        pluginMetricsInstances.add(pluginMetrics);
+        StandardAuthorizer authorizer = new StandardAuthorizer();
+        authorizer.configure(configs);
+        authorizer.withPluginMetrics(pluginMetrics);
+        authorizer.start(new AuthorizerTestServerInfo(List.of(plaintext)));
+        authorizer.setAclMutator(new MockAclMutator(authorizer));
+        authorizer.completeInitialLoad();
+        return authorizer;
+    }
+
+    private RequestContext newRequestContext(KafkaPrincipal principal, 
InetAddress clientAddress) {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        RequestHeader header = new RequestHeader(ApiKeys.PRODUCE, (short) 2, 
"", 1);
+        return new RequestContext(header, "", clientAddress, principal, 
ListenerName.forSecurityProtocol(securityProtocol),
+                securityProtocol, ClientInformation.EMPTY, false);
+    }
+
+    private boolean authorizeByResourceType(Authorizer authorizer, 
RequestContext requestContext, AclOperation operation, ResourceType 
resourceType) {
+        return authorizer.authorizeByResourceType(requestContext, operation, 
resourceType) == AuthorizationResult.ALLOWED;
+    }
+
+    private void addAcls(Authorizer authorizer, Set<AccessControlEntry> aces, 
ResourcePattern resourcePattern) throws Exception {
+        List<AclBinding> bindings = aces.stream().map(ace -> new 
AclBinding(resourcePattern, ace)).toList();
+        List<? extends CompletionStage<AclCreateResult>> results = 
authorizer.createAcls(requestContext, bindings);
+        for (CompletionStage<AclCreateResult> ac : results) {
+            AclCreateResult result = ac.toCompletableFuture().get();
+            result.exception().ifPresent(e -> {
+                throw e;
+            });
+        }
+    }
+
+    private void removeAcls(Authorizer authorizer, Set<AccessControlEntry> 
aces, ResourcePattern resourcePattern) throws Exception {
+        List<AclBindingFilter> filters = aces.isEmpty()
+            ? List.of(new AclBindingFilter(resourcePattern.toFilter(), 
AccessControlEntryFilter.ANY))
+            : aces.stream().map(ace -> new AclBinding(resourcePattern, 
ace).toFilter()).toList();
+
+        for (CompletionStage<AclDeleteResult> stage : 
authorizer.deleteAcls(requestContext, filters)) {
+            AclDeleteResult result = stage.toCompletableFuture().get();
+            result.exception().ifPresent(e -> {
+                throw e;
+            });
+            for (AclDeleteResult.AclBindingDeleteResult r : 
result.aclBindingDeleteResults()) {
+                r.exception().ifPresent(e -> {
+                    throw e;
+                });
+            }
+        }
+    }
+
+}

Reply via email to