This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push: new b7a8fd7bfe6 KAKFA-14733: Added a few missing checks for Kraft Authorizer and updated AclAuthorizerTest to run tests for both zk and kraft (#13282) b7a8fd7bfe6 is described below commit b7a8fd7bfe6c1eaa6760cdb881b14475f25b80f7 Author: Purshotam Chauhan <pchau...@confluent.io> AuthorDate: Tue Feb 21 19:21:15 2023 +0530 KAKFA-14733: Added a few missing checks for Kraft Authorizer and updated AclAuthorizerTest to run tests for both zk and kraft (#13282) Added the following checks - * In StandardAuthorizerData.authorize() to fail if `patternType` other than `LITERAL` is passed. * In AclControlManager.addAcl() to fail if Resource Name is null or empty. Also, updated `AclAuthorizerTest` includes a lot of tests covering various scenarios that are missing in `StandardAuthorizerTest`. This PR changes the AclAuthorizerTest to run tests for both `zk` and `kraft` modes - * Rename AclAuthorizerTest -> AuthorizerTest * Parameterize relevant tests to run for both modes Reviewers: Manikumar Reddy <manikumar.re...@gmail.com> --- ...clAuthorizerTest.scala => AuthorizerTest.scala} | 584 ++++++++++++--------- .../apache/kafka/controller/AclControlManager.java | 3 + .../authorizer/StandardAuthorizerData.java | 3 + .../kafka/controller/MockAclControlManager.java | 50 ++ .../kafka/metadata/authorizer/MockAclMutator.java | 62 +++ .../authorizer/StandardAuthorizerTest.java | 4 +- 6 files changed, 459 insertions(+), 247 deletions(-) diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala similarity index 61% rename from core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala rename to core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala index 9fde3cbadb2..c39b785e38a 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala @@ -19,9 +19,10 @@ package kafka.security.authorizer import kafka.Kafka import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString} import kafka.server.{KafkaConfig, QuorumTestHarness} -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} import kafka.zk.ZkAclStore import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient} +import org.apache.kafka.common.Endpoint import org.apache.kafka.common.acl.AclOperation._ import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} import org.apache.kafka.common.acl._ @@ -32,24 +33,34 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE import org.apache.kafka.common.resource.ResourceType._ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType} -import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils} +import org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.AuthorizerTestServerInfo +import org.apache.kafka.metadata.authorizer.{MockAclMutator, StandardAuthorizer} import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, IBP_2_0_IV1} import org.apache.zookeeper.client.ZKClientConfig import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import java.net.InetAddress import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Files +import java.util import java.util.concurrent.{Executors, Semaphore, TimeUnit} -import java.util.{Collections, UUID} +import java.util.{Collections, Properties, UUID} import scala.collection.mutable import scala.jdk.CollectionConverters._ -class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { +class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { + + private final val PLAINTEXT = new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "127.0.0.1", 9020) + private final val KRAFT = "kraft" + private final val ZK = "zk" + private val allowReadAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, READ, ALLOW) private val allowWriteAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, WRITE, ALLOW) @@ -60,66 +71,84 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { private val clusterResource = new ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL) private val wildcardPrincipal = JSecurityUtils.parseKafkaPrincipal(WildcardPrincipalString) - private val aclAuthorizer = new AclAuthorizer - private val aclAuthorizer2 = new AclAuthorizer + private var authorizer1: Authorizer = _ + private var authorizer2: Authorizer = _ + + private var _testInfo: TestInfo = _ class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) { override def equals(o: scala.Any): Boolean = false } - override def authorizer: Authorizer = aclAuthorizer + override def authorizer: Authorizer = authorizer1 + + def testInfo: TestInfo = _testInfo @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) + _testInfo = testInfo - // Increase maxUpdateRetries to avoid transient failures - aclAuthorizer.maxUpdateRetries = Int.MaxValue - aclAuthorizer2.maxUpdateRetries = Int.MaxValue - - val props = TestUtils.createBrokerConfig(0, zkConnect) - props.put(AclAuthorizer.SuperUsersProp, superUsers) - + val props = properties config = KafkaConfig.fromProps(props) - aclAuthorizer.configure(config.originals) - aclAuthorizer2.configure(config.originals) + authorizer1 = createAuthorizer(config.originals) + authorizer2 = createAuthorizer(config.originals) resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL) - zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, - Time.SYSTEM, "kafka.test", "AclAuthorizerTest", new ZKClientConfig, "AclAuthorizerTest") + if (!TestInfoUtils.isKRaft(testInfo)) { + zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, + Time.SYSTEM, "kafka.test", "AclAuthorizerTest", new ZKClientConfig, "AclAuthorizerTest") + // Increase maxUpdateRetries to avoid transient failures + authorizer1.asInstanceOf[AclAuthorizer].maxUpdateRetries = Int.MaxValue + authorizer2.asInstanceOf[AclAuthorizer].maxUpdateRetries = Int.MaxValue + } + } + + def properties: Properties = { + val props = TestUtils.createBrokerConfig(0, zkConnectOrNull) + props.put(AclAuthorizer.SuperUsersProp, superUsers) + props } @AfterEach override def tearDown(): Unit = { - aclAuthorizer.close() - aclAuthorizer2.close() - zooKeeperClient.close() + authorizer1.close() + authorizer2.close() + TestUtils.clearYammerMetrics() + if (!TestInfoUtils.isKRaft(_testInfo)) { + zooKeeperClient.close() + } super.tearDown() } - @Test - def testAuthorizeThrowsOnNonLiteralResource(): Unit = { - assertThrows(classOf[IllegalArgumentException], () => authorize(aclAuthorizer, requestContext, READ, + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAuthorizeThrowsOnNonLiteralResource(quorum: String): Unit = { + assertThrows(classOf[IllegalArgumentException], () => authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "something", PREFIXED))) } - @Test - def testAuthorizeWithEmptyResourceName(): Unit = { - assertFalse(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL))) - addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL)) - assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL))) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAuthorizeWithEmptyResourceName(quorum: String): Unit = { + assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL))) + addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL)) + assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL))) } // Authorizing the empty resource is not supported because we create a znode with the resource name. - @Test - def testEmptyAclThrowsException(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testEmptyAclThrowsException(quorum: String): Unit = { val e = assertThrows(classOf[ApiException], - () => addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(GROUP, "", LITERAL))) - assertTrue(e.getCause.isInstanceOf[IllegalArgumentException], s"Unexpected exception $e") + () => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP, "", LITERAL))) + if (quorum.equals(ZK)) + assertTrue(e.getCause.isInstanceOf[IllegalArgumentException], s"Unexpected exception $e") } - @Test - def testTopicAcl(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testTopicAcl(quorum: String): Unit = { val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob") val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman") @@ -152,29 +181,30 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val host1Context = newRequestContext(user1, host1) val host2Context = newRequestContext(user1, host2) - assertTrue(authorize(aclAuthorizer, host2Context, READ, resource), "User1 should have READ access from host2") - assertFalse(authorize(aclAuthorizer, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl") - assertTrue(authorize(aclAuthorizer, host1Context, WRITE, resource), "User1 should have WRITE access from host1") - assertFalse(authorize(aclAuthorizer, host2Context, WRITE, resource), "User1 should not have WRITE access from host2 as no allow acl is defined") - assertTrue(authorize(aclAuthorizer, host1Context, DESCRIBE, resource), "User1 should not have DESCRIBE access from host1") - assertTrue(authorize(aclAuthorizer, host2Context, DESCRIBE, resource), "User1 should have DESCRIBE access from host2") - assertFalse(authorize(aclAuthorizer, host1Context, ALTER, resource), "User1 should not have edit access from host1") - assertFalse(authorize(aclAuthorizer, host2Context, ALTER, resource), "User1 should not have edit access from host2") + assertTrue(authorize(authorizer1, host2Context, READ, resource), "User1 should have READ access from host2") + assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl") + assertTrue(authorize(authorizer1, host1Context, WRITE, resource), "User1 should have WRITE access from host1") + assertFalse(authorize(authorizer1, host2Context, WRITE, resource), "User1 should not have WRITE access from host2 as no allow acl is defined") + assertTrue(authorize(authorizer1, host1Context, DESCRIBE, resource), "User1 should not have DESCRIBE access from host1") + assertTrue(authorize(authorizer1, host2Context, DESCRIBE, resource), "User1 should have DESCRIBE access from host2") + assertFalse(authorize(authorizer1, host1Context, ALTER, resource), "User1 should not have edit access from host1") + assertFalse(authorize(authorizer1, host2Context, ALTER, resource), "User1 should not have edit access from host2") - //test if user has READ and write access they also get describe access + //test if user has READ or WRITE access they also get DESCRIBE access val user2Context = newRequestContext(user2, host1) val user3Context = newRequestContext(user3, host1) - assertTrue(authorize(aclAuthorizer, user2Context, DESCRIBE, resource), "User2 should have DESCRIBE access from host1") - assertTrue(authorize(aclAuthorizer, user3Context, DESCRIBE, resource), "User3 should have DESCRIBE access from host2") - assertTrue(authorize(aclAuthorizer, user2Context, READ, resource), "User2 should have READ access from host1") - assertTrue(authorize(aclAuthorizer, user3Context, WRITE, resource), "User3 should have WRITE access from host2") + assertTrue(authorize(authorizer1, user2Context, DESCRIBE, resource), "User2 should have DESCRIBE access from host1") + assertTrue(authorize(authorizer1, user3Context, DESCRIBE, resource), "User3 should have DESCRIBE access from host2") + assertTrue(authorize(authorizer1, user2Context, READ, resource), "User2 should have READ access from host1") + assertTrue(authorize(authorizer1, user3Context, WRITE, resource), "User3 should have WRITE access from host2") } /** CustomPrincipals should be compared with their principal type and name */ - @Test - def testAllowAccessWithCustomPrincipal(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAllowAccessWithCustomPrincipal(quorum: String): Unit = { val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username) val host1 = InetAddress.getByName("192.168.1.1") @@ -189,12 +219,13 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val host1Context = newRequestContext(customUserPrincipal, host1) val host2Context = newRequestContext(customUserPrincipal, host2) - assertTrue(authorize(aclAuthorizer, host2Context, READ, resource), "User1 should have READ access from host2") - assertFalse(authorize(aclAuthorizer, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl") + assertTrue(authorize(authorizer1, host2Context, READ, resource), "User1 should have READ access from host2") + assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl") } - @Test - def testDenyTakesPrecedence(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testDenyTakesPrecedence(quorum: String): Unit = { val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val host = InetAddress.getByName("192.168.2.1") val session = newRequestContext(user, host) @@ -205,21 +236,23 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { changeAclAndVerify(Set.empty, acls, Set.empty) - assertFalse(authorize(aclAuthorizer, session, READ, resource), "deny should take precedence over allow.") + assertFalse(authorize(authorizer1, session, READ, resource), "deny should take precedence over allow.") } - @Test - def testAllowAllAccess(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAllowAllAccess(quorum: String): Unit = { val allowAllAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, AclOperation.ALL, ALLOW) changeAclAndVerify(Set.empty, Set(allowAllAcl), Set.empty) val context = newRequestContext(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4")) - assertTrue(authorize(aclAuthorizer, context, READ, resource), "allow all acl should allow access to all.") + assertTrue(authorize(authorizer1, context, READ, resource), "allow all acl should allow access to all.") } - @Test - def testSuperUserHasAccess(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testSuperUserHasAccess(quorum: String): Unit = { val denyAllAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, AclOperation.ALL, DENY) changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty) @@ -227,26 +260,28 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val session1 = newRequestContext(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4")) val session2 = newRequestContext(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), InetAddress.getByName("192.0.4.4")) - assertTrue(authorize(aclAuthorizer, session1, READ, resource), "superuser always has access, no matter what acls.") - assertTrue(authorize(aclAuthorizer, session2, READ, resource), "superuser always has access, no matter what acls.") + assertTrue(authorize(authorizer1, session1, READ, resource), "superuser always has access, no matter what acls.") + assertTrue(authorize(authorizer1, session2, READ, resource), "superuser always has access, no matter what acls.") } /** CustomPrincipals should be compared with their principal type and name */ - @Test - def testSuperUserWithCustomPrincipalHasAccess(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testSuperUserWithCustomPrincipalHasAccess(quorum: String): Unit = { val denyAllAcl = new AccessControlEntry(WildcardPrincipalString, WildcardHost, AclOperation.ALL, DENY) changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty) val session = newRequestContext(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4")) - assertTrue(authorize(aclAuthorizer, session, READ, resource), "superuser with custom principal always has access, no matter what acls.") + assertTrue(authorize(authorizer1, session, READ, resource), "superuser with custom principal always has access, no matter what acls.") } - @Test - def testWildCardAcls(): Unit = { - assertFalse(authorize(aclAuthorizer, requestContext, READ, resource), "when acls = [], authorizer should fail close.") + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testWildCardAcls(quorum: String): Unit = { + assertFalse(authorize(authorizer1, requestContext, READ, resource), "when acls = [], authorizer should fail close.") val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val host1 = InetAddress.getByName("192.168.3.1") @@ -255,7 +290,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val acls = changeAclAndVerify(Set.empty, Set(readAcl), Set.empty, wildCardResource) val host1Context = newRequestContext(user1, host1) - assertTrue(authorize(aclAuthorizer, host1Context, READ, resource), "User1 should have READ access from host1") + assertTrue(authorize(authorizer1, host1Context, READ, resource), "User1 should have READ access from host1") //allow WRITE to specific topic. val writeAcl = new AccessControlEntry(user1.toString, host1.getHostAddress, WRITE, ALLOW) @@ -265,23 +300,25 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val denyWriteOnWildCardResourceAcl = new AccessControlEntry(user1.toString, host1.getHostAddress, WRITE, DENY) changeAclAndVerify(acls, Set(denyWriteOnWildCardResourceAcl), Set.empty, wildCardResource) - assertFalse(authorize(aclAuthorizer, host1Context, WRITE, resource), "User1 should not have WRITE access from host1") + assertFalse(authorize(authorizer1, host1Context, WRITE, resource), "User1 should not have WRITE access from host1") } - @Test - def testNoAclFound(): Unit = { - assertFalse(authorize(aclAuthorizer, requestContext, READ, resource), "when acls = [], authorizer should deny op.") + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testNoAclFound(quorum: String): Unit = { + assertFalse(authorize(authorizer1, requestContext, READ, resource), "when acls = [], authorizer should deny op.") } - @Test - def testNoAclFoundOverride(): Unit = { - val props = TestUtils.createBrokerConfig(1, zkConnect) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testNoAclFoundOverride(quorum: String): Unit = { + val props = properties props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true") val cfg = KafkaConfig.fromProps(props) - val testAuthorizer = new AclAuthorizer + var testAuthorizer: Authorizer = null try { - testAuthorizer.configure(cfg.originals) + testAuthorizer = createAuthorizer(cfg.originals) assertTrue(authorize(testAuthorizer, requestContext, READ, resource), "when acls = null or [], authorizer should allow op with allow.everyone = true.") } finally { @@ -289,8 +326,9 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { } } - @Test - def testAclManagementAPIs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAclManagementAPIs(quorum: String): Unit = { val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob") val host1 = "host1" @@ -308,9 +346,9 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { acls = changeAclAndVerify(acls, Set(acl5), Set.empty) //test get by principal name. - TestUtils.waitUntilTrue(() => Set(acl1, acl2).map(acl => new AclBinding(resource, acl)) == getAcls(aclAuthorizer, user1), + TestUtils.waitUntilTrue(() => Set(acl1, acl2).map(acl => new AclBinding(resource, acl)) == getAcls(authorizer1, user1), "changes not propagated in timeout period") - TestUtils.waitUntilTrue(() => Set(acl3, acl4, acl5).map(acl => new AclBinding(resource, acl)) == getAcls(aclAuthorizer, user2), + TestUtils.waitUntilTrue(() => Set(acl3, acl4, acl5).map(acl => new AclBinding(resource, acl)) == getAcls(authorizer1, user2), "changes not propagated in timeout period") val resourceToAcls = Map[ResourcePattern, Set[AccessControlEntry]]( @@ -324,20 +362,24 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val expectedAcls = (resourceToAcls + (resource -> acls)).flatMap { case (res, resAcls) => resAcls.map { acl => new AclBinding(res, acl) } }.toSet - TestUtils.waitUntilTrue(() => expectedAcls == getAcls(aclAuthorizer), "changes not propagated in timeout period.") + TestUtils.waitUntilTrue(() => expectedAcls == getAcls(authorizer1), "changes not propagated in timeout period.") //test remove acl from existing acls. acls = changeAclAndVerify(acls, Set.empty, Set(acl1, acl5)) //test remove all acls for resource - removeAcls(aclAuthorizer, Set.empty, resource) - TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], aclAuthorizer, resource) - assertFalse(zkClient.resourceExists(resource)) + removeAcls(authorizer1, Set.empty, resource) + TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, resource) + if (quorum.equals(ZK)) { + assertFalse(zkClient.resourceExists(resource)) + } //test removing last acl also deletes ZooKeeper path acls = changeAclAndVerify(Set.empty, Set(acl1), Set.empty) changeAclAndVerify(acls, Set.empty, acls) - assertFalse(zkClient.resourceExists(resource)) + if (quorum.equals(ZK)) { + assertFalse(zkClient.resourceExists(resource)) + } } @Test @@ -345,18 +387,18 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val acl1 = new AccessControlEntry(user1.toString, "host-1", READ, ALLOW) val acls = Set(acl1) - addAcls(aclAuthorizer, acls, resource) + addAcls(authorizer1, acls, resource) val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob") val resource1 = new ResourcePattern(TOPIC, "test-2", LITERAL) val acl2 = new AccessControlEntry(user2.toString, "host3", READ, DENY) val acls1 = Set(acl2) - addAcls(aclAuthorizer, acls1, resource1) + addAcls(authorizer1, acls1, resource1) zkClient.deleteAclChangeNotifications() - val authorizer = new AclAuthorizer + var authorizer: Authorizer = null try { - authorizer.configure(config.originals) + authorizer = createAclAuthorizer(config.originals) assertEquals(acls, getAcls(authorizer, resource)) assertEquals(acls1, getAcls(authorizer, resource1)) @@ -387,7 +429,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { configureSemaphore.acquire() val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val acls = Set(new AccessControlEntry(user1.toString, "host-1", READ, DENY)) - addAcls(aclAuthorizer, acls, resource) + addAcls(authorizer1, acls, resource) listenerSemaphore.release() future.get(10, TimeUnit.SECONDS) @@ -409,10 +451,10 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob") val acl2 = new AccessControlEntry(user2.toString, WildcardHost, READ, DENY) - addAcls(aclAuthorizer, Set(acl1), commonResource) - addAcls(aclAuthorizer, Set(acl2), commonResource) + addAcls(authorizer1, Set(acl1), commonResource) + addAcls(authorizer1, Set(acl2), commonResource) - TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer, commonResource) + TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource) } @Test @@ -426,23 +468,23 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val acl2 = new AccessControlEntry(user2.toString, WildcardHost, READ, DENY) // Add on each instance - addAcls(aclAuthorizer, Set(acl1), commonResource) - addAcls(aclAuthorizer2, Set(acl2), commonResource) + addAcls(authorizer1, Set(acl1), commonResource) + addAcls(authorizer2, Set(acl2), commonResource) - TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer, commonResource) - TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer2, commonResource) + TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource) + TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer2, commonResource) val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "joe") val acl3 = new AccessControlEntry(user3.toString, WildcardHost, READ, DENY) // Add on one instance and delete on another - addAcls(aclAuthorizer, Set(acl3), commonResource) - val deleted = removeAcls(aclAuthorizer2, Set(acl3), commonResource) + addAcls(authorizer1, Set(acl3), commonResource) + val deleted = removeAcls(authorizer2, Set(acl3), commonResource) assertTrue(deleted, "The authorizer should see a value that needs to be deleted") - TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer, commonResource) - TestUtils.waitAndVerifyAcls(Set(acl1, acl2), aclAuthorizer2, commonResource) + TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource) + TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer2, commonResource) } @Test @@ -458,12 +500,12 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val concurrentFuctions = acls.map { case (acl, aclId) => () => { if (aclId % 2 == 0) { - addAcls(aclAuthorizer, Set(acl), commonResource) + addAcls(authorizer1, Set(acl), commonResource) } else { - addAcls(aclAuthorizer2, Set(acl), commonResource) + addAcls(authorizer2, Set(acl), commonResource) } if (aclId % 10 == 0) { - removeAcls(aclAuthorizer2, Set(acl), commonResource) + removeAcls(authorizer2, Set(acl), commonResource) } } } @@ -474,15 +516,16 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000) - TestUtils.waitAndVerifyAcls(expectedAcls, aclAuthorizer, commonResource) - TestUtils.waitAndVerifyAcls(expectedAcls, aclAuthorizer2, commonResource) + TestUtils.waitAndVerifyAcls(expectedAcls, authorizer1, commonResource) + TestUtils.waitAndVerifyAcls(expectedAcls, authorizer2, commonResource) } /** * Test ACL inheritance, as described in #{org.apache.kafka.common.acl.AclOperation} */ - @Test - def testAclInheritance(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAclInheritance(quorum: String): Unit = { testImplicationsOfAllow(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS)) testImplicationsOfDeny(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, @@ -501,15 +544,15 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val host = InetAddress.getByName("192.168.3.1") val hostContext = newRequestContext(user, host) val acl = new AccessControlEntry(user.toString, WildcardHost, parentOp, ALLOW) - addAcls(aclAuthorizer, Set(acl), clusterResource) + addAcls(authorizer1, Set(acl), clusterResource) AclOperation.values.filter(validOp).foreach { op => - val authorized = authorize(aclAuthorizer, hostContext, op, clusterResource) + val authorized = authorize(authorizer1, hostContext, op, clusterResource) if (allowedOps.contains(op) || op == parentOp) assertTrue(authorized, s"ALLOW $parentOp should imply ALLOW $op") else assertFalse(authorized, s"ALLOW $parentOp should not imply ALLOW $op") } - removeAcls(aclAuthorizer, Set(acl), clusterResource) + removeAcls(authorizer1, Set(acl), clusterResource) } private def testImplicationsOfDeny(parentOp: AclOperation, deniedOps: Set[AclOperation]): Unit = { @@ -518,15 +561,15 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val host1Context = newRequestContext(user1, host1) val acls = Set(new AccessControlEntry(user1.toString, WildcardHost, parentOp, DENY), new AccessControlEntry(user1.toString, WildcardHost, AclOperation.ALL, ALLOW)) - addAcls(aclAuthorizer, acls, clusterResource) + addAcls(authorizer1, acls, clusterResource) AclOperation.values.filter(validOp).foreach { op => - val authorized = authorize(aclAuthorizer, host1Context, op, clusterResource) + val authorized = authorize(authorizer1, host1Context, op, clusterResource) if (deniedOps.contains(op) || op == parentOp) assertFalse(authorized, s"DENY $parentOp should imply DENY $op") else assertTrue(authorized, s"DENY $parentOp should not imply DENY $op") } - removeAcls(aclAuthorizer, acls, clusterResource) + removeAcls(authorizer1, acls, clusterResource) } @Test @@ -536,151 +579,164 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { // Alternate authorizer to keep adding and removing ZooKeeper path val concurrentFuctions = (0 to 50).map { _ => () => { - addAcls(aclAuthorizer, Set(acl), resource) - removeAcls(aclAuthorizer2, Set(acl), resource) + addAcls(authorizer1, Set(acl), resource) + removeAcls(authorizer2, Set(acl), resource) } } TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000) - TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], aclAuthorizer, resource) - TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], aclAuthorizer2, resource) + TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, resource) + TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer2, resource) } - @Test - def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = { - addAcls(aclAuthorizer, Set(allowReadAcl), wildCardResource) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAccessAllowedIfAllowAclExistsOnWildcardResource(quorum: String): Unit = { + addAcls(authorizer1, Set(allowReadAcl), wildCardResource) - assertTrue(authorize(aclAuthorizer, requestContext, READ, resource)) + assertTrue(authorize(authorizer1, requestContext, READ, resource)) } - @Test - def testDeleteAclOnWildcardResource(): Unit = { - addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), wildCardResource) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testDeleteAclOnWildcardResource(quorum: String): Unit = { + addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource) - removeAcls(aclAuthorizer, Set(allowReadAcl), wildCardResource) + removeAcls(authorizer1, Set(allowReadAcl), wildCardResource) - assertEquals(Set(allowWriteAcl), getAcls(aclAuthorizer, wildCardResource)) + assertEquals(Set(allowWriteAcl), getAcls(authorizer1, wildCardResource)) } - @Test - def testDeleteAllAclOnWildcardResource(): Unit = { - addAcls(aclAuthorizer, Set(allowReadAcl), wildCardResource) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testDeleteAllAclOnWildcardResource(quorum: String): Unit = { + addAcls(authorizer1, Set(allowReadAcl), wildCardResource) - removeAcls(aclAuthorizer, Set.empty, wildCardResource) + removeAcls(authorizer1, Set.empty, wildCardResource) - assertEquals(Set.empty, getAcls(aclAuthorizer)) + assertEquals(Set.empty, getAcls(authorizer1)) } - @Test - def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = { - addAcls(aclAuthorizer, Set(allowReadAcl), prefixedResource) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAccessAllowedIfAllowAclExistsOnPrefixedResource(quorum: String): Unit = { + addAcls(authorizer1, Set(allowReadAcl), prefixedResource) - assertTrue(authorize(aclAuthorizer, requestContext, READ, resource)) + assertTrue(authorize(authorizer1, requestContext, READ, resource)) } - @Test - def testDeleteAclOnPrefixedResource(): Unit = { - addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), prefixedResource) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testDeleteAclOnPrefixedResource(quorum: String): Unit = { + addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource) - removeAcls(aclAuthorizer, Set(allowReadAcl), prefixedResource) + removeAcls(authorizer1, Set(allowReadAcl), prefixedResource) - assertEquals(Set(allowWriteAcl), getAcls(aclAuthorizer, prefixedResource)) + assertEquals(Set(allowWriteAcl), getAcls(authorizer1, prefixedResource)) } - @Test - def testDeleteAllAclOnPrefixedResource(): Unit = { - addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), prefixedResource) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testDeleteAllAclOnPrefixedResource(quorum: String): Unit = { + addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource) - removeAcls(aclAuthorizer, Set.empty, prefixedResource) + removeAcls(authorizer1, Set.empty, prefixedResource) - assertEquals(Set.empty, getAcls(aclAuthorizer)) + assertEquals(Set.empty, getAcls(authorizer1)) } - @Test - def testAddAclsOnLiteralResource(): Unit = { - addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), resource) - addAcls(aclAuthorizer, Set(allowWriteAcl, denyReadAcl), resource) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAddAclsOnLiteralResource(quorum: String): Unit = { + addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), resource) + addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), resource) - assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(aclAuthorizer, resource)) - assertEquals(Set.empty, getAcls(aclAuthorizer, wildCardResource)) - assertEquals(Set.empty, getAcls(aclAuthorizer, prefixedResource)) + assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(authorizer1, resource)) + assertEquals(Set.empty, getAcls(authorizer1, wildCardResource)) + assertEquals(Set.empty, getAcls(authorizer1, prefixedResource)) } - @Test - def testAddAclsOnWildcardResource(): Unit = { - addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), wildCardResource) - addAcls(aclAuthorizer, Set(allowWriteAcl, denyReadAcl), wildCardResource) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAddAclsOnWildcardResource(quorum: String): Unit = { + addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource) + addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), wildCardResource) - assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(aclAuthorizer, wildCardResource)) - assertEquals(Set.empty, getAcls(aclAuthorizer, resource)) - assertEquals(Set.empty, getAcls(aclAuthorizer, prefixedResource)) + assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(authorizer1, wildCardResource)) + assertEquals(Set.empty, getAcls(authorizer1, resource)) + assertEquals(Set.empty, getAcls(authorizer1, prefixedResource)) } - @Test - def testAddAclsOnPrefixedResource(): Unit = { - addAcls(aclAuthorizer, Set(allowReadAcl, allowWriteAcl), prefixedResource) - addAcls(aclAuthorizer, Set(allowWriteAcl, denyReadAcl), prefixedResource) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAddAclsOnPrefixedResource(quorum: String): Unit = { + addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource) + addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), prefixedResource) - assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(aclAuthorizer, prefixedResource)) - assertEquals(Set.empty, getAcls(aclAuthorizer, wildCardResource)) - assertEquals(Set.empty, getAcls(aclAuthorizer, resource)) + assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), getAcls(authorizer1, prefixedResource)) + assertEquals(Set.empty, getAcls(authorizer1, wildCardResource)) + assertEquals(Set.empty, getAcls(authorizer1, resource)) } - @Test - def testAuthorizeWithPrefixedResource(): Unit = { - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", LITERAL)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", PREFIXED)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fooo-" + UUID.randomUUID(), PREFIXED)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fo-" + UUID.randomUUID(), PREFIXED)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fop-" + UUID.randomUUID(), PREFIXED)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-" + UUID.randomUUID(), PREFIXED)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-", PREFIXED)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED)) - addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", LITERAL)) - - addAcls(aclAuthorizer, Set(allowReadAcl), prefixedResource) - - assertTrue(authorize(aclAuthorizer, requestContext, READ, resource)) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAuthorizeWithPrefixedResource(quorum: String): Unit = { + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", LITERAL)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", PREFIXED)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fooo-" + UUID.randomUUID(), PREFIXED)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fo-" + UUID.randomUUID(), PREFIXED)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fop-" + UUID.randomUUID(), PREFIXED)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-" + UUID.randomUUID(), PREFIXED)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "fon-", PREFIXED)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED)) + addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", LITERAL)) + + addAcls(authorizer1, Set(allowReadAcl), prefixedResource) + + assertTrue(authorize(authorizer1, requestContext, READ, resource)) } - @Test - def testSingleCharacterResourceAcls(): Unit = { - addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(TOPIC, "f", LITERAL)) - assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "f", LITERAL))) - assertFalse(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "foo", LITERAL))) - - addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(TOPIC, "_", PREFIXED)) - assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "_foo", LITERAL))) - assertTrue(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "_", LITERAL))) - assertFalse(authorize(aclAuthorizer, requestContext, READ, new ResourcePattern(TOPIC, "foo_", LITERAL))) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testSingleCharacterResourceAcls(quorum: String): Unit = { + addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "f", LITERAL)) + assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "f", LITERAL))) + assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "foo", LITERAL))) + + addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "_", PREFIXED)) + assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "_foo", LITERAL))) + assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "_", LITERAL))) + assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "foo_", LITERAL))) } - @Test - def testGetAclsPrincipal(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testGetAclsPrincipal(quorum: String): Unit = { val aclOnSpecificPrincipal = new AccessControlEntry(principal.toString, WildcardHost, WRITE, ALLOW) - addAcls(aclAuthorizer, Set(aclOnSpecificPrincipal), resource) + addAcls(authorizer1, Set(aclOnSpecificPrincipal), resource) assertEquals(0, - getAcls(aclAuthorizer, wildcardPrincipal).size, "acl on specific should not be returned for wildcard request") + getAcls(authorizer1, wildcardPrincipal).size, "acl on specific should not be returned for wildcard request") assertEquals(1, - getAcls(aclAuthorizer, principal).size, "acl on specific should be returned for specific request") + getAcls(authorizer1, principal).size, "acl on specific should be returned for specific request") assertEquals(1, - getAcls(aclAuthorizer, new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size, "acl on specific should be returned for different principal instance") + getAcls(authorizer1, new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size, "acl on specific should be returned for different principal instance") - removeAcls(aclAuthorizer, Set.empty, resource) + removeAcls(authorizer1, Set.empty, resource) val aclOnWildcardPrincipal = new AccessControlEntry(WildcardPrincipalString, WildcardHost, WRITE, ALLOW) - addAcls(aclAuthorizer, Set(aclOnWildcardPrincipal), resource) + addAcls(authorizer1, Set(aclOnWildcardPrincipal), resource) - assertEquals(1, getAcls(aclAuthorizer, wildcardPrincipal).size, "acl on wildcard should be returned for wildcard request") - assertEquals(0, getAcls(aclAuthorizer, principal).size, "acl on wildcard should not be returned for specific request") + assertEquals(1, getAcls(authorizer1, wildcardPrincipal).size, "acl on wildcard should be returned for wildcard request") + assertEquals(0, getAcls(authorizer1, principal).size, "acl on wildcard should not be returned for specific request") } - @Test - def testAclsFilter(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAclsFilter(quorum: String): Unit = { val resource1 = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL) val resource2 = new ResourcePattern(TOPIC, "bar-" + UUID.randomUUID(), LITERAL) val prefixedResource = new ResourcePattern(TOPIC, "bar-", PREFIXED) @@ -690,25 +746,31 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val acl3 = new AclBinding(resource2, new AccessControlEntry(principal.toString, WildcardHost, DESCRIBE, ALLOW)) val acl4 = new AclBinding(prefixedResource, new AccessControlEntry(wildcardPrincipal.toString, WildcardHost, READ, ALLOW)) - aclAuthorizer.createAcls(requestContext, List(acl1, acl2, acl3, acl4).asJava) - assertEquals(Set(acl1, acl2, acl3, acl4), aclAuthorizer.acls(AclBindingFilter.ANY).asScala.toSet) - assertEquals(Set(acl1, acl2), aclAuthorizer.acls(new AclBindingFilter(resource1.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet) - assertEquals(Set(acl4), aclAuthorizer.acls(new AclBindingFilter(prefixedResource.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet) + authorizer1.createAcls(requestContext, List(acl1, acl2, acl3, acl4).asJava) + assertEquals(Set(acl1, acl2, acl3, acl4), authorizer1.acls(AclBindingFilter.ANY).asScala.toSet) + assertEquals(Set(acl1, acl2), authorizer1.acls(new AclBindingFilter(resource1.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet) + assertEquals(Set(acl4), authorizer1.acls(new AclBindingFilter(prefixedResource.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet) val matchingFilter = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, resource2.name, MATCH), AccessControlEntryFilter.ANY) - assertEquals(Set(acl3, acl4), aclAuthorizer.acls(matchingFilter).asScala.toSet) + assertEquals(Set(acl3, acl4), authorizer1.acls(matchingFilter).asScala.toSet) val filters = List(matchingFilter, acl1.toFilter, new AclBindingFilter(resource2.toFilter, AccessControlEntryFilter.ANY), new AclBindingFilter(new ResourcePatternFilter(TOPIC, "baz", PatternType.ANY), AccessControlEntryFilter.ANY)) - val deleteResults = aclAuthorizer.deleteAcls(requestContext, filters.asJava).asScala.map(_.toCompletableFuture.get) + val deleteResults = authorizer1.deleteAcls(requestContext, filters.asJava).asScala.map(_.toCompletableFuture.get) assertEquals(List.empty, deleteResults.filter(_.exception.isPresent)) filters.indices.foreach { i => assertEquals(Set.empty, deleteResults(i).aclBindingDeleteResults.asScala.toSet.filter(_.exception.isPresent)) } assertEquals(Set(acl3, acl4), deleteResults(0).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet) assertEquals(Set(acl1), deleteResults(1).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet) - assertEquals(Set.empty, deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet) + if (quorum.equals(ZK)) { + assertEquals(Set.empty, deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet) + } else { + // standard authorizer first finds the acls that match filters and then delete them. + // So filters[2] will match acl3 even though it is also matching filters[0] and will be deleted by it + assertEquals(Set(acl3), deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet) + } assertEquals(Set.empty, deleteResults(3).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet) } @@ -716,14 +778,14 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = { givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV0)) val e = assertThrows(classOf[ApiException], - () => addAcls(aclAuthorizer, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED))) + () => addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "z_other", PREFIXED))) assertTrue(e.getCause.isInstanceOf[UnsupportedVersionException], s"Unexpected exception $e") } @Test def testCreateAclWithInvalidResourceName(): Unit = { assertThrows(classOf[ApiException], - () => addAcls(aclAuthorizer, Set(allowReadAcl), new ResourcePattern(TOPIC, "test/1", LITERAL))) + () => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "test/1", LITERAL))) } @Test @@ -733,7 +795,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val expected = new String(ZkAclStore(PREFIXED).changeStore .createChangeNode(resource).bytes, UTF_8) - addAcls(aclAuthorizer, Set(denyReadAcl), resource) + addAcls(authorizer1, Set(denyReadAcl), resource) val actual = getAclChangeEventAsString(PREFIXED) @@ -747,7 +809,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val expected = new String(ZkAclStore(PREFIXED).changeStore .createChangeNode(resource).bytes, UTF_8) - addAcls(aclAuthorizer, Set(denyReadAcl), resource) + addAcls(authorizer1, Set(denyReadAcl), resource) val actual = getAclChangeEventAsString(PREFIXED) @@ -761,7 +823,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val expected = new String(ZkAclStore(LITERAL).changeStore .createChangeNode(resource).bytes, UTF_8) - addAcls(aclAuthorizer, Set(denyReadAcl), resource) + addAcls(authorizer1, Set(denyReadAcl), resource) val actual = getAclChangeEventAsString(LITERAL) @@ -775,7 +837,7 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val expected = new String(ZkAclStore(LITERAL).changeStore .createChangeNode(resource).bytes, UTF_8) - addAcls(aclAuthorizer, Set(denyReadAcl), resource) + addAcls(authorizer1, Set(denyReadAcl), resource) val actual = getAclChangeEventAsString(LITERAL) @@ -914,14 +976,14 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val ace = new AccessControlEntry(principal.toString, WildcardHost, READ, ALLOW) val updateSemaphore = new Semaphore(1) - def createAcl(createAuthorizer: AclAuthorizer, resource: ResourcePattern): AclBinding = { + def createAcl(createAuthorizer: Authorizer, resource: ResourcePattern): AclBinding = { val acl = new AclBinding(resource, ace) createAuthorizer.createAcls(requestContext, Collections.singletonList(acl)).asScala .foreach(_.toCompletableFuture.get(15, TimeUnit.SECONDS)) acl } - def deleteAcl(deleteAuthorizer: AclAuthorizer, + def deleteAcl(deleteAuthorizer: Authorizer, resource: ResourcePattern, deletePatternType: PatternType): List[AclBinding] = { @@ -935,16 +997,16 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { .toList } - def listAcls(authorizer: AclAuthorizer): List[AclBinding] = { + def listAcls(authorizer: Authorizer): List[AclBinding] = { authorizer.acls(AclBindingFilter.ANY).asScala.toList } - def verifyCreateDeleteAcl(deleteAuthorizer: AclAuthorizer, + def verifyCreateDeleteAcl(deleteAuthorizer: Authorizer, resource: ResourcePattern, deletePatternType: PatternType): Unit = { updateSemaphore.acquire() assertEquals(List.empty, listAcls(deleteAuthorizer)) - val acl = createAcl(aclAuthorizer, resource) + val acl = createAcl(authorizer1, resource) val deleted = deleteAcl(deleteAuthorizer, resource, deletePatternType) if (deletePatternType != PatternType.MATCH) { assertEquals(List(acl), deleted) @@ -982,34 +1044,35 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { } } - @Test - def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = { - val props = TestUtils.createBrokerConfig(1, zkConnect) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAuthorizeByResourceTypeNoAclFoundOverride(quorum: String): Unit = { + val props = properties props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true") val cfg = KafkaConfig.fromProps(props) - val aclAuthorizer = new AclAuthorizer + var authorizer: Authorizer = null try { - aclAuthorizer.configure(cfg.originals) - assertTrue(authorizeByResourceType(aclAuthorizer, requestContext, READ, resource.resourceType()), + authorizer = createAuthorizer(cfg.originals) + assertTrue(authorizeByResourceType(authorizer, requestContext, READ, resource.resourceType()), "If allow.everyone.if.no.acl.found = true, caller should have read access to at least one topic") - assertTrue(authorizeByResourceType(aclAuthorizer, requestContext, WRITE, resource.resourceType()), + assertTrue(authorizeByResourceType(authorizer, requestContext, WRITE, resource.resourceType()), "If allow.everyone.if.no.acl.found = true, caller should have write access to at least one topic") } finally { - aclAuthorizer.close() + authorizer.close() } } private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[MetadataVersion]): Unit = { - aclAuthorizer.close() + authorizer1.close() - val props = TestUtils.createBrokerConfig(0, zkConnect) + val props = TestUtils.createBrokerConfig(0, zkConnectOrNull) props.put(AclAuthorizer.SuperUsersProp, superUsers) protocolVersion.foreach(version => props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString)) config = KafkaConfig.fromProps(props) - aclAuthorizer.configure(config.originals) + authorizer1.configure(config.originals) } private def getAclChangeEventAsString(patternType: PatternType) = { @@ -1031,37 +1094,37 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { var acls = originalAcls if(addedAcls.nonEmpty) { - addAcls(aclAuthorizer, addedAcls, resource) + addAcls(authorizer1, addedAcls, resource) acls ++= addedAcls } if(removedAcls.nonEmpty) { - removeAcls(aclAuthorizer, removedAcls, resource) + removeAcls(authorizer1, removedAcls, resource) acls --=removedAcls } - TestUtils.waitAndVerifyAcls(acls, aclAuthorizer, resource) + TestUtils.waitAndVerifyAcls(acls, authorizer1, resource) acls } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { val action = new Action(operation, resource, 1, true, true) authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED } - private def getAcls(authorizer: AclAuthorizer, resourcePattern: ResourcePattern): Set[AccessControlEntry] = { + private def getAcls(authorizer: Authorizer, resourcePattern: ResourcePattern): Set[AccessControlEntry] = { val acls = authorizer.acls(new AclBindingFilter(resourcePattern.toFilter, AccessControlEntryFilter.ANY)).asScala.toSet acls.map(_.entry) } - private def getAcls(authorizer: AclAuthorizer, principal: KafkaPrincipal): Set[AclBinding] = { + private def getAcls(authorizer: Authorizer, principal: KafkaPrincipal): Set[AclBinding] = { val filter = new AclBindingFilter(ResourcePatternFilter.ANY, new AccessControlEntryFilter(principal.toString, null, AclOperation.ANY, AclPermissionType.ANY)) authorizer.acls(filter).asScala.toSet } - private def getAcls(authorizer: AclAuthorizer): Set[AclBinding] = { + private def getAcls(authorizer: Authorizer): Set[AclBinding] = { authorizer.acls(AclBindingFilter.ANY).asScala.toSet } @@ -1084,4 +1147,35 @@ class AclAuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { file.getAbsolutePath } finally writer.close() } + + def createAuthorizer(configs: util.Map[String, AnyRef]): Authorizer = { + var testAuthorizer: Authorizer = null + if (TestInfoUtils.isKRaft(_testInfo)) { + testAuthorizer = createStandardAuthorizer(configs) + } else { + testAuthorizer = createAclAuthorizer(configs) + } + testAuthorizer + } + + def createAclAuthorizer(configs: util.Map[String, AnyRef]): AclAuthorizer = { + val authorizer = new AclAuthorizer + authorizer.configure(configs) + authorizer + } + + def createStandardAuthorizer(configs: util.Map[String, AnyRef]): StandardAuthorizer = { + val standardAuthorizer = new StandardAuthorizer + standardAuthorizer.configure(configs) + initializeStandardAuthorizer(standardAuthorizer, new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))) + standardAuthorizer + } + + def initializeStandardAuthorizer(standardAuthorizer: StandardAuthorizer, + serverInfo: AuthorizerServerInfo): Unit = { + val aclMutator = new MockAclMutator(standardAuthorizer) + standardAuthorizer.start(serverInfo) + standardAuthorizer.setAclMutator(aclMutator) + standardAuthorizer.completeInitialLoad() + } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java index 313927fc537..7569e88ef0f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java @@ -139,6 +139,9 @@ public class AclControlManager { throw new InvalidRequestException("Invalid permissionType " + binding.entry().permissionType()); } + if (binding.pattern().name() == null || binding.pattern().name().isEmpty()) { + throw new InvalidRequestException("Resource name should not be empty"); + } } ControllerResult<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> filters) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java index f456ff4d772..c6d41a52da4 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java @@ -259,6 +259,9 @@ public class StandardAuthorizerData { AuthorizableRequestContext requestContext, Action action ) { + if (action.resourcePattern().patternType() != LITERAL) { + throw new IllegalArgumentException("Only literal resources are supported. Got: " + action.resourcePattern().patternType()); + } KafkaPrincipal principal = baseKafkaPrincipal(requestContext); final MatchingRule rule; diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java b/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java new file mode 100644 index 00000000000..16d71e8786c --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.metadata.AccessControlEntryRecord; +import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer; +import org.apache.kafka.server.authorizer.AclCreateResult; +import org.apache.kafka.server.authorizer.AclDeleteResult; +import org.apache.kafka.timeline.SnapshotRegistry; + +import java.util.List; +import java.util.Optional; + +public class MockAclControlManager extends AclControlManager { + public MockAclControlManager(LogContext logContext, + Optional<ClusterMetadataAuthorizer> authorizer) { + super(new SnapshotRegistry(logContext), authorizer); + } + + public List<AclCreateResult> createAndReplayAcls(List<AclBinding> acls) { + ControllerResult<List<AclCreateResult>> createResults = createAcls(acls); + createResults.records().forEach(record -> replay((AccessControlEntryRecord) record.message(), Optional.empty())); + return createResults.response(); + } + + public List<AclDeleteResult> deleteAndReplayAcls(List<AclBindingFilter> filters) { + ControllerResult<List<AclDeleteResult>> deleteResults = deleteAcls(filters); + deleteResults.records().forEach(record -> replay((RemoveAccessControlEntryRecord) record.message(), Optional.empty())); + return deleteResults.response(); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java new file mode 100644 index 00000000000..188a8dc69cc --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.authorizer; + +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.ControllerRequestContext; +import org.apache.kafka.controller.MockAclControlManager; +import org.apache.kafka.server.authorizer.AclCreateResult; +import org.apache.kafka.server.authorizer.AclDeleteResult; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +public class MockAclMutator implements AclMutator { + MockAclControlManager aclControlManager; + + public MockAclMutator(StandardAuthorizer authorizer) { + aclControlManager = createAclControlManager(authorizer); + } + + private MockAclControlManager createAclControlManager(StandardAuthorizer standardAuthorizer) { + LogContext logContext = new LogContext(); + return new MockAclControlManager(logContext, Optional.of(standardAuthorizer)); + } + + @Override + public CompletableFuture<List<AclCreateResult>> createAcls( + ControllerRequestContext context, + List<AclBinding> aclBindings + ) { + CompletableFuture<List<AclCreateResult>> future = new CompletableFuture<>(); + future.complete(aclControlManager.createAndReplayAcls(aclBindings)); + return future; + } + + @Override + public CompletableFuture<List<AclDeleteResult>> deleteAcls( + ControllerRequestContext context, + List<AclBindingFilter> aclBindingFilters + ) { + CompletableFuture<List<AclDeleteResult>> future = new CompletableFuture<>(); + future.complete(aclControlManager.deleteAndReplayAcls(aclBindingFilters)); + return future; + } +} \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java index 3966054e744..d9fcff1b9b3 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java @@ -103,10 +103,10 @@ public class StandardAuthorizerTest { "127.0.0.1", 9020); - static class AuthorizerTestServerInfo implements AuthorizerServerInfo { + public static class AuthorizerTestServerInfo implements AuthorizerServerInfo { private final Collection<Endpoint> endpoints; - AuthorizerTestServerInfo(Collection<Endpoint> endpoints) { + public AuthorizerTestServerInfo(Collection<Endpoint> endpoints) { assertFalse(endpoints.isEmpty()); this.endpoints = endpoints; }