[ https://issues.apache.org/jira/browse/KAFKA-6912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499253#comment-16499253 ]
ASF GitHub Bot commented on KAFKA-6912: --------------------------------------- lindong28 closed pull request #5030: KAFKA-6912: Add test for authorization with custom principal types URL: https://github.com/apache/kafka/pull/5030 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index da45be2e6be..1d9febfb334 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -56,6 +56,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { override def numBrokers: Int = 1 val brokerId: Integer = 0 + def userPrincipal = KafkaPrincipal.ANONYMOUS val topic = "topic" val topicPattern = "topic.*" @@ -76,23 +77,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val deleteTopicResource = new Resource(Topic, deleteTopic) val transactionalIdResource = new Resource(TransactionalId, transactionalId) - val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) - val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) - val groupDeleteAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete))) - val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction))) - val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create))) - val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter))) - val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) - val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite))) - val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) - val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) - val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) - val topicAlterAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter))) - val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete))) - val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs))) - val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs))) - val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) - val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) + val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) + val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) + val groupDeleteAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete))) + val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, ClusterAction))) + val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create))) + val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter))) + val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) + val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite))) + val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) + val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write))) + val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) + val topicAlterAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter))) + val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete))) + val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, DescribeConfigs))) + val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, AlterConfigs))) + val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write))) + val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() @@ -234,7 +235,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { override def setUp() { super.setUp() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource) for (_ <- 0 until producerCount) producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), @@ -378,12 +379,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def createAclsRequest = new CreateAclsRequest.Builder( Collections.singletonList(new AclCreation(new AclBinding( new AdminResource(AdminResourceType.TOPIC, "mytopic"), - new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))))).build() + new AccessControlEntry(userPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.DENY))))).build() private def deleteAclsRequest = new DeleteAclsRequest.Builder( Collections.singletonList(new AclBindingFilter( new ResourceFilter(AdminResourceType.TOPIC, null), - new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))).build() + new AccessControlEntryFilter(userPrincipal.toString, "*", AclOperation.ANY, AclPermissionType.DENY)))).build() private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build() @@ -524,7 +525,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testProduceWithTopicDescribe() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) try { sendRecords(numRecords, tp) fail("should have thrown exception") @@ -536,7 +537,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testProduceWithTopicRead() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) try { sendRecords(numRecords, tp) fail("should have thrown exception") @@ -548,7 +549,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testProduceWithTopicWrite() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(numRecords, tp) } @@ -556,7 +557,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { def testCreatePermissionNeededForWritingToNonExistentTopic() { val newTopic = "newTopic" val topicPartition = new TopicPartition(newTopic, 0) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic)) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic)) try { sendRecords(numRecords, topicPartition) Assert.fail("should have thrown exception") @@ -564,13 +565,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest { case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) sendRecords(numRecords, topicPartition) } @Test(expected = classOf[TopicAuthorizationException]) def testConsumeUsingAssignWithNoAccess(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() this.consumers.head.assign(List(tp).asJava) @@ -579,11 +580,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) try { // note this still depends on group access because we haven't set offsets explicitly, which means // they will first be fetched from the consumer coordinator (which requires group access) @@ -597,11 +598,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) // in this case, we do an explicit seek, so there should be no need to query the coordinator at all this.consumers.head.assign(List(tp).asJava) @@ -611,11 +612,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test(expected = classOf[KafkaException]) def testConsumeWithoutTopicDescribeAccess() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.assign(List(tp).asJava) // the consumer should raise an exception if it receives UNKNOWN_TOPIC_OR_PARTITION @@ -625,12 +626,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testConsumeWithTopicDescribe() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) try { this.consumers.head.assign(List(tp).asJava) consumeRecords(this.consumers.head) @@ -642,12 +643,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testConsumeWithTopicWrite() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) try { this.consumers.head.assign(List(tp).asJava) consumeRecords(this.consumers.head) @@ -660,23 +661,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testConsumeWithTopicAndGroupRead() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.assign(List(tp).asJava) consumeRecords(this.consumers.head) } @Test def testPatternSubscriptionWithNoTopicAccess() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) this.consumers.head.poll(50) assertTrue(this.consumers.head.subscription.isEmpty) @@ -684,12 +685,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) val consumer = consumers.head consumer.subscribe(Pattern.compile(topicPattern)) try { @@ -702,25 +703,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testPatternSubscriptionWithTopicAndGroupRead() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) // create an unmatched topic val unmatchedTopic = "unmatched" createTopic(unmatchedTopic) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic)) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic)) sendRecords(1, new TopicPartition(unmatchedTopic, part)) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) val consumer = consumers.head consumer.subscribe(Pattern.compile(topicPattern)) consumeRecords(consumer) // set the subscription pattern to an internal topic that the consumer has read permission to. Since // internal topics are not included, we should not be assigned any partitions from this topic - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), new Resource(Topic, + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), new Resource(Topic, GROUP_METADATA_TOPIC_NAME)) consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME)) consumer.poll(0) @@ -730,12 +731,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testPatternSubscriptionMatchingInternalTopic() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) val consumerConfig = new Properties consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") @@ -748,7 +749,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic).asJava, consumer.subscription) // now authorize the user for the internal topic and verify that we can subscribe - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic, + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Topic, GROUP_METADATA_TOPIC_NAME)) consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME)) consumer.poll(0) @@ -758,14 +759,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), internalTopicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), internalTopicResource) val consumerConfig = new Properties consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") @@ -784,12 +785,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testPatternSubscriptionNotMatchingInternalTopic() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) val consumerConfig = new Properties consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") @@ -806,7 +807,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val newTopic = "newTopic" val topicPartition = new TopicPartition(newTopic, 0) val newTopicResource = new Resource(Topic, newTopic) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource) addAndVerifyAcls(groupReadAcl(groupResource), groupResource) addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource) try { @@ -818,8 +819,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) sendRecords(numRecords, topicPartition) consumeRecords(this.consumers.head, topic = newTopic, part = 0) @@ -832,34 +833,34 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test(expected = classOf[KafkaException]) def testCommitWithNoTopicAccess() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) } @Test(expected = classOf[TopicAuthorizationException]) def testCommitWithTopicWrite() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) } @Test(expected = classOf[TopicAuthorizationException]) def testCommitWithTopicDescribe() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) } @Test(expected = classOf[GroupAuthorizationException]) def testCommitWithNoGroupAccess() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) } @Test def testCommitWithTopicAndGroupRead() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) } @@ -871,14 +872,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test(expected = classOf[GroupAuthorizationException]) def testOffsetFetchWithNoGroupAccess() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.position(tp) } @Test(expected = classOf[KafkaException]) def testOffsetFetchWithNoTopicAccess() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.position(tp) } @@ -886,13 +887,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testFetchAllOffsetsTopicAuthorization() { val offset = 15L - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(offset)).asJava) removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) // send offset fetch requests directly since the consumer does not expose an API to do so // note there's only one broker, so no need to lookup the group coordinator @@ -904,7 +905,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertTrue(offsetFetchResponse.responseData.isEmpty) // now add describe permission on the topic and verify that the offset can be fetched - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) offsetFetchResponse = sendOffsetFetchRequest(offsetFetchRequest, anySocketServer) assertEquals(Errors.NONE, offsetFetchResponse.error) assertTrue(offsetFetchResponse.responseData.containsKey(tp)) @@ -913,16 +914,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testOffsetFetchTopicDescribe() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.position(tp) } @Test def testOffsetFetchWithTopicAndGroupRead() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.position(tp) } @@ -934,27 +935,27 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testListOffsetsWithTopicDescribe() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) this.consumers.head.partitionsFor(topic) } @Test(expected = classOf[GroupAuthorizationException]) def testDescribeGroupApiWithNoGroupAcl() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group) } @Test def testDescribeGroupApiWithGroupDescribe() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group) } @Test def testDescribeGroupCliWithGroupDescribe() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) val opts = new ConsumerGroupCommandOptions(cgcArgs) @@ -965,9 +966,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testDeleteGroupApiWithDeleteGroupAcl() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group)) @@ -976,8 +977,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testDeleteGroupApiWithNoDeleteGroupAcl() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group)) @@ -1000,7 +1001,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testUnauthorizedDeleteTopicsWithDescribe() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), deleteTopicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), deleteTopicResource) val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS) val version = ApiKeys.DELETE_TOPICS.latestVersion val deleteResponse = DeleteTopicsResponse.parse(response, version) @@ -1010,7 +1011,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testDeleteTopicsWithWildCardAuth() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*")) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*")) val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS) val version = ApiKeys.DELETE_TOPICS.latestVersion val deleteResponse = DeleteTopicsResponse.parse(response, version) @@ -1028,7 +1029,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testUnauthorizedDeleteRecordsWithDescribe() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), deleteTopicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), deleteTopicResource) val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS) val version = ApiKeys.DELETE_RECORDS.latestVersion val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version) @@ -1037,7 +1038,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testDeleteRecordsWithWildCardAuth() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*")) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*")) val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS) val version = ApiKeys.DELETE_RECORDS.latestVersion val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version) @@ -1055,7 +1056,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testCreatePartitionsWithWildCardAuth() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*")) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*")) val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS) val version = ApiKeys.CREATE_PARTITIONS.latestVersion val createPartitionsResponse = CreatePartitionsResponse.parse(response, version) @@ -1064,7 +1065,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test(expected = classOf[TransactionalIdAuthorizationException]) def testTransactionalProducerInitTransactionsNoWriteTransactionalIdAcl(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), transactionalIdResource) val producer = buildTransactionalProducer() producer.initTransactions() } @@ -1077,9 +1078,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testSendOffsetsWithNoConsumerGroupDescribeAccess(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource) val producer = buildTransactionalProducer() producer.initTransactions() producer.beginTransaction() @@ -1093,8 +1094,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testSendOffsetsWithNoConsumerGroupWriteAccess(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource) val producer = buildTransactionalProducer() producer.initTransactions() producer.beginTransaction() @@ -1108,7 +1109,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testIdempotentProducerNoIdempotentWriteAclInInitProducerId(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) val producer = buildIdempotentProducer() try { // the InitProducerId is sent asynchronously, so we expect the error either in the callback @@ -1132,8 +1133,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testIdempotentProducerNoIdempotentWriteAclInProduce(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource) val producer = buildIdempotentProducer() @@ -1142,7 +1143,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // revoke the IdempotentWrite permission removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) try { // the send should now fail with a cluster auth error @@ -1165,16 +1166,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def shouldInitTransactionsWhenAclSet(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource) val producer = buildTransactionalProducer() producer.initTransactions() } @Test def testTransactionalProducerTopicAuthorizationExceptionInSendCallback(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource) // add describe access so that we can fetch metadata - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) val producer = buildTransactionalProducer() producer.initTransactions() producer.beginTransaction() @@ -1194,9 +1195,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testTransactionalProducerTopicAuthorizationExceptionInCommit(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource) // add describe access so that we can fetch metadata - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) val producer = buildTransactionalProducer() producer.initTransactions() producer.beginTransaction() @@ -1212,11 +1213,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource) val producer = buildTransactionalProducer() producer.initTransactions() removeAllAcls() - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) try { producer.beginTransaction() producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get @@ -1229,8 +1230,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnEndTransaction(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) val producer = buildTransactionalProducer() producer.initTransactions() producer.beginTransaction() @@ -1246,9 +1247,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic)) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic)) val producer = buildTransactionalProducer() producer.initTransactions() producer.beginTransaction() @@ -1266,8 +1267,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), groupResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), groupResource) val producer = buildTransactionalProducer() producer.initTransactions() producer.beginTransaction() @@ -1283,8 +1284,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) val producer = buildIdempotentProducer() producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get } diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala new file mode 100644 index 00000000000..27c3f31c3ff --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.util.Properties + +import kafka.api.GroupAuthorizerIntegrationTest._ +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} + + +object GroupAuthorizerIntegrationTest { + val GroupPrincipalType = "Group" + val TestGroupPrincipal = new KafkaPrincipal(GroupPrincipalType, "testGroup") + class GroupPrincipalBuilder extends KafkaPrincipalBuilder { + override def build(context: AuthenticationContext): KafkaPrincipal = { + TestGroupPrincipal + } + } +} + +class GroupAuthorizerIntegrationTest extends AuthorizerIntegrationTest { + override val kafkaPrincipalType = GroupPrincipalType + override def userPrincipal = TestGroupPrincipal + + override def propertyOverrides(properties: Properties): Unit = { + properties.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, + classOf[GroupPrincipalBuilder].getName) + super.propertyOverrides(properties) + } +} \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala new file mode 100644 index 00000000000..8dea0fcf15a --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import kafka.api.GroupEndToEndAuthorizationTest._ +import kafka.utils.JaasTestUtils +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext} + +object GroupEndToEndAuthorizationTest { + val GroupPrincipalType = "Group" + val ClientGroup = "testGroup" + class GroupPrincipalBuilder extends KafkaPrincipalBuilder { + override def build(context: AuthenticationContext): KafkaPrincipal = { + context match { + case ctx: SaslAuthenticationContext => + if (ctx.server.getAuthorizationID == JaasTestUtils.KafkaScramUser) + new KafkaPrincipal(GroupPrincipalType, ClientGroup) + else + new KafkaPrincipal(GroupPrincipalType, ctx.server.getAuthorizationID) + case _ => + KafkaPrincipal.ANONYMOUS + } + } + } +} + +class GroupEndToEndAuthorizationTest extends SaslScramSslEndToEndAuthorizationTest { + override val kafkaPrincipalType = GroupPrincipalType + override val clientPrincipal = ClientGroup + this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName) +} diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala index d304ffc1cae..dd1e705b030 100644 --- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala @@ -28,7 +28,6 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList override val clientPrincipal = JaasTestUtils.KafkaScramUser override val kafkaPrincipal = JaasTestUtils.KafkaScramAdmin - private val clientPassword = JaasTestUtils.KafkaScramPassword private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword override def configureSecurityBeforeServersStart() { @@ -42,7 +41,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes override def setUp() { super.setUp() // Create client credentials after starting brokers so that dynamic credential creation is also tested - createScramCredentials(zkConnect, clientPrincipal, clientPassword) + createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2) } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add authorization tests for custom principal types > -------------------------------------------------- > > Key: KAFKA-6912 > URL: https://issues.apache.org/jira/browse/KAFKA-6912 > Project: Kafka > Issue Type: Task > Components: core > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > Fix For: 2.0.0 > > > KIP-290 proposes to add prefixed-wildcarded principals to enable ACLs to be > configured for groups of principals. This doesn't work with all security > protocols - e.g. SSL principals are of format CN=name,O=org,C=country where > prefixes don't fit in terms of grouping. Kafka currently doesn't support the > concept of user groups, but it is possible to use custom > KafkaPrincipalBuilders to generate group principals during authentication. By > default, Kafka generates principals of type User, but custom types (e.g. > Group) are supported. This does currently have the restriction ACLs may be > defined only at group level (cannot combine both user & group level ACLs for > a connection), but it works currently for all security protocols. > We don't have any tests that verify custom principal types and authorization > based on custom principal types. It will be good to add some tests. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)