[ 
https://issues.apache.org/jira/browse/KAFKA-6912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499253#comment-16499253
 ] 

ASF GitHub Bot commented on KAFKA-6912:
---------------------------------------

lindong28 closed pull request #5030: KAFKA-6912: Add test for authorization 
with custom principal types
URL: https://github.com/apache/kafka/pull/5030
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index da45be2e6be..1d9febfb334 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -56,6 +56,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   override def numBrokers: Int = 1
   val brokerId: Integer = 0
+  def userPrincipal = KafkaPrincipal.ANONYMOUS
 
   val topic = "topic"
   val topicPattern = "topic.*"
@@ -76,23 +77,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val deleteTopicResource = new Resource(Topic, deleteTopic)
   val transactionalIdResource = new Resource(TransactionalId, transactionalId)
 
-  val groupReadAcl = Map(groupResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
-  val groupDescribeAcl = Map(groupResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
-  val groupDeleteAcl = Map(groupResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
-  val clusterAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
-  val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
-  val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
-  val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
-  val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)))
-  val topicReadAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
-  val topicWriteAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
-  val topicDescribeAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
-  val topicAlterAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
-  val topicDeleteAcl = Map(deleteTopicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
-  val topicDescribeConfigsAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs)))
-  val topicAlterConfigsAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs)))
-  val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
-  val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+  val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, 
Acl.WildCardHost, Read)))
+  val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, 
Allow, Acl.WildCardHost, Describe)))
+  val groupDeleteAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, 
Acl.WildCardHost, Delete)))
+  val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, 
Allow, Acl.WildCardHost, ClusterAction)))
+  val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new 
Acl(userPrincipal, Allow, Acl.WildCardHost, Create)))
+  val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new 
Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)))
+  val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new 
Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
+  val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new 
Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)))
+  val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, 
Acl.WildCardHost, Read)))
+  val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, 
Acl.WildCardHost, Write)))
+  val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, 
Allow, Acl.WildCardHost, Describe)))
+  val topicAlterAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, 
Acl.WildCardHost, Alter)))
+  val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(userPrincipal, 
Allow, Acl.WildCardHost, Delete)))
+  val topicDescribeConfigsAcl = Map(topicResource -> Set(new 
Acl(userPrincipal, Allow, Acl.WildCardHost, DescribeConfigs)))
+  val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(userPrincipal, 
Allow, Acl.WildCardHost, AlterConfigs)))
+  val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new 
Acl(userPrincipal, Allow, Acl.WildCardHost, Write)))
+  val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new 
Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
 
 
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
@@ -234,7 +235,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   override def setUp() {
     super.setUp()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
ClusterAction)), Resource.ClusterResource)
 
     for (_ <- 0 until producerCount)
       producers += 
TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
@@ -378,12 +379,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   private def createAclsRequest = new CreateAclsRequest.Builder(
     Collections.singletonList(new AclCreation(new AclBinding(
       new AdminResource(AdminResourceType.TOPIC, "mytopic"),
-      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, 
AclPermissionType.DENY))))).build()
+      new AccessControlEntry(userPrincipal.toString, "*", AclOperation.WRITE, 
AclPermissionType.DENY))))).build()
 
   private def deleteAclsRequest = new DeleteAclsRequest.Builder(
     Collections.singletonList(new AclBindingFilter(
       new ResourceFilter(AdminResourceType.TOPIC, null),
-      new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, 
AclPermissionType.DENY)))).build()
+      new AccessControlEntryFilter(userPrincipal.toString, "*", 
AclOperation.ANY, AclPermissionType.DENY)))).build()
 
   private def alterReplicaLogDirsRequest = new 
AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
 
@@ -524,7 +525,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testProduceWithTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
     try {
       sendRecords(numRecords, tp)
       fail("should have thrown exception")
@@ -536,7 +537,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testProduceWithTopicRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
     try {
       sendRecords(numRecords, tp)
       fail("should have thrown exception")
@@ -548,7 +549,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testProduceWithTopicWrite() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(numRecords, tp)
   }
 
@@ -556,7 +557,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testCreatePermissionNeededForWritingToNonExistentTopic() {
     val newTopic = "newTopic"
     val topicPartition = new TopicPartition(newTopic, 0)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), new Resource(Topic, newTopic))
     try {
       sendRecords(numRecords, topicPartition)
       Assert.fail("should have thrown exception")
@@ -564,13 +565,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       case e: TopicAuthorizationException => 
assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
     }
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Create)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Create)), Resource.ClusterResource)
     sendRecords(numRecords, topicPartition)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
   def testConsumeUsingAssignWithNoAccess(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
     this.consumers.head.assign(List(tp).asJava)
@@ -579,11 +580,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
     try {
       // note this still depends on group access because we haven't set 
offsets explicitly, which means
       // they will first be fetched from the consumer coordinator (which 
requires group access)
@@ -597,11 +598,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
 
     // in this case, we do an explicit seek, so there should be no need to 
query the coordinator at all
     this.consumers.head.assign(List(tp).asJava)
@@ -611,11 +612,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test(expected = classOf[KafkaException])
   def testConsumeWithoutTopicDescribeAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
 
     // the consumer should raise an exception if it receives 
UNKNOWN_TOPIC_OR_PARTITION
@@ -625,12 +626,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testConsumeWithTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
     try {
       this.consumers.head.assign(List(tp).asJava)
       consumeRecords(this.consumers.head)
@@ -642,12 +643,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testConsumeWithTopicWrite() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
     try {
       this.consumers.head.assign(List(tp).asJava)
       consumeRecords(this.consumers.head)
@@ -660,23 +661,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testConsumeWithTopicAndGroupRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
     consumeRecords(this.consumers.head)
   }
 
   @Test
   def testPatternSubscriptionWithNoTopicAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
     this.consumers.head.subscribe(Pattern.compile(topicPattern), new 
NoOpConsumerRebalanceListener)
     this.consumers.head.poll(50)
     assertTrue(this.consumers.head.subscription.isEmpty)
@@ -684,12 +685,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
     val consumer = consumers.head
     consumer.subscribe(Pattern.compile(topicPattern))
     try {
@@ -702,25 +703,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testPatternSubscriptionWithTopicAndGroupRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
 
     // create an unmatched topic
     val unmatchedTopic = "unmatched"
     createTopic(unmatchedTopic)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)),  new Resource(Topic, unmatchedTopic))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)),  new Resource(Topic, unmatchedTopic))
     sendRecords(1, new TopicPartition(unmatchedTopic, part))
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
     val consumer = consumers.head
     consumer.subscribe(Pattern.compile(topicPattern))
     consumeRecords(consumer)
 
     // set the subscription pattern to an internal topic that the consumer has 
read permission to. Since
     // internal topics are not included, we should not be assigned any 
partitions from this topic
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)),  new Resource(Topic,
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)),  new Resource(Topic,
       GROUP_METADATA_TOPIC_NAME))
     consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
     consumer.poll(0)
@@ -730,12 +731,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testPatternSubscriptionMatchingInternalTopic() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
 
     val consumerConfig = new Properties
     consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
@@ -748,7 +749,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       assertEquals(Set(topic).asJava, consumer.subscription)
 
       // now authorize the user for the internal topic and verify that we can 
subscribe
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), Resource(Topic,
+      addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), Resource(Topic,
         GROUP_METADATA_TOPIC_NAME))
       consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
       consumer.poll(0)
@@ -758,14 +759,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission() 
{
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
     val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), internalTopicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), internalTopicResource)
 
     val consumerConfig = new Properties
     consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
@@ -784,12 +785,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testPatternSubscriptionNotMatchingInternalTopic() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
 
     val consumerConfig = new Properties
     consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
@@ -806,7 +807,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val newTopic = "newTopic"
     val topicPartition = new TopicPartition(newTopic, 0)
     val newTopicResource = new Resource(Topic, newTopic)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), newTopicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), newTopicResource)
     addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
     addAndVerifyAcls(clusterAcl(Resource.ClusterResource), 
Resource.ClusterResource)
     try {
@@ -818,8 +819,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
         assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
     }
 
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), newTopicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Create)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), newTopicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Create)), Resource.ClusterResource)
 
     sendRecords(numRecords, topicPartition)
     consumeRecords(this.consumers.head, topic = newTopic, part = 0)
@@ -832,34 +833,34 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test(expected = classOf[KafkaException])
   def testCommitWithNoTopicAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
   def testCommitWithTopicWrite() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
   def testCommitWithTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[GroupAuthorizationException])
   def testCommitWithNoGroupAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test
   def testCommitWithTopicAndGroupRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
@@ -871,14 +872,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test(expected = classOf[GroupAuthorizationException])
   def testOffsetFetchWithNoGroupAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.position(tp)
   }
 
   @Test(expected = classOf[KafkaException])
   def testOffsetFetchWithNoTopicAccess() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.position(tp)
   }
@@ -886,13 +887,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testFetchAllOffsetsTopicAuthorization() {
     val offset = 15L
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.commitSync(Map(tp -> new 
OffsetAndMetadata(offset)).asJava)
 
     removeAllAcls()
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
 
     // send offset fetch requests directly since the consumer does not expose 
an API to do so
     // note there's only one broker, so no need to lookup the group coordinator
@@ -904,7 +905,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertTrue(offsetFetchResponse.responseData.isEmpty)
 
     // now add describe permission on the topic and verify that the offset can 
be fetched
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
     offsetFetchResponse = sendOffsetFetchRequest(offsetFetchRequest, 
anySocketServer)
     assertEquals(Errors.NONE, offsetFetchResponse.error)
     assertTrue(offsetFetchResponse.responseData.containsKey(tp))
@@ -913,16 +914,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testOffsetFetchTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.position(tp)
   }
 
   @Test
   def testOffsetFetchWithTopicAndGroupRead() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.position(tp)
   }
@@ -934,27 +935,27 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testListOffsetsWithTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
     this.consumers.head.partitionsFor(topic)
   }
 
   @Test(expected = classOf[GroupAuthorizationException])
   def testDescribeGroupApiWithNoGroupAcl() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
     AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
   }
 
   @Test
   def testDescribeGroupApiWithGroupDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
     AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
   }
 
   @Test
   def testDescribeGroupCliWithGroupDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
@@ -965,9 +966,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteGroupApiWithDeleteGroupAcl() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Delete)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Delete)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, 
"")).asJava)
     val result = 
AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
@@ -976,8 +977,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteGroupApiWithNoDeleteGroupAcl() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, 
"")).asJava)
     val result = 
AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
@@ -1000,7 +1001,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testUnauthorizedDeleteTopicsWithDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), deleteTopicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), deleteTopicResource)
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
     val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
@@ -1010,7 +1011,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteTopicsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Delete)), new Resource(Topic, "*"))
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
     val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
@@ -1028,7 +1029,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testUnauthorizedDeleteRecordsWithDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), deleteTopicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), deleteTopicResource)
     val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
     val version = ApiKeys.DELETE_RECORDS.latestVersion
     val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
@@ -1037,7 +1038,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testDeleteRecordsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Delete)), new Resource(Topic, "*"))
     val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
     val version = ApiKeys.DELETE_RECORDS.latestVersion
     val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
@@ -1055,7 +1056,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testCreatePartitionsWithWildCardAuth() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Alter)), new Resource(Topic, "*"))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Alter)), new Resource(Topic, "*"))
     val response = connectAndSend(createPartitionsRequest, 
ApiKeys.CREATE_PARTITIONS)
     val version = ApiKeys.CREATE_PARTITIONS.latestVersion
     val createPartitionsResponse = CreatePartitionsResponse.parse(response, 
version)
@@ -1064,7 +1065,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test(expected = classOf[TransactionalIdAuthorizationException])
   def testTransactionalProducerInitTransactionsNoWriteTransactionalIdAcl(): 
Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), transactionalIdResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
   }
@@ -1077,9 +1078,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testSendOffsetsWithNoConsumerGroupDescribeAccess(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
ClusterAction)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), transactionalIdResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1093,8 +1094,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testSendOffsetsWithNoConsumerGroupWriteAccess(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), groupResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1108,7 +1109,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testIdempotentProducerNoIdempotentWriteAclInInitProducerId(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     val producer = buildIdempotentProducer()
     try {
       // the InitProducerId is sent asynchronously, so we expect the error 
either in the callback
@@ -1132,8 +1133,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testIdempotentProducerNoIdempotentWriteAclInProduce(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
IdempotentWrite)), Resource.ClusterResource)
 
     val producer = buildIdempotentProducer()
 
@@ -1142,7 +1143,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     // revoke the IdempotentWrite permission
     removeAllAcls()
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
 
     try {
       // the send should now fail with a cluster auth error
@@ -1165,16 +1166,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
 
   @Test
   def shouldInitTransactionsWhenAclSet(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), transactionalIdResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
   }
 
   @Test
   def testTransactionalProducerTopicAuthorizationExceptionInSendCallback(): 
Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), transactionalIdResource)
     // add describe access so that we can fetch metadata
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1194,9 +1195,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testTransactionalProducerTopicAuthorizationExceptionInCommit(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), transactionalIdResource)
     // add describe access so that we can fetch metadata
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1212,11 +1213,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
 
   @Test
   def 
shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend():
 Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), transactionalIdResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     removeAllAcls()
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     try {
       producer.beginTransaction()
       producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, 
"1".getBytes)).get
@@ -1229,8 +1230,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def 
shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnEndTransaction():
 Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1246,9 +1247,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): 
Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic))
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), new Resource(Topic, deleteTopic))
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1266,8 +1267,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def 
shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn():
 Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), groupResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), groupResource)
     val producer = buildTransactionalProducer()
     producer.initTransactions()
     producer.beginTransaction()
@@ -1283,8 +1284,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
IdempotentWrite)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), topicResource)
     val producer = buildIdempotentProducer()
     producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, 
"1".getBytes)).get
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
new file mode 100644
index 00000000000..27c3f31c3ff
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import java.util.Properties
+
+import kafka.api.GroupAuthorizerIntegrationTest._
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+
+
+object GroupAuthorizerIntegrationTest {
+  val GroupPrincipalType = "Group"
+  val TestGroupPrincipal = new KafkaPrincipal(GroupPrincipalType, "testGroup")
+  class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      TestGroupPrincipal
+    }
+  }
+}
+
+class GroupAuthorizerIntegrationTest extends AuthorizerIntegrationTest {
+  override val kafkaPrincipalType = GroupPrincipalType
+  override def userPrincipal = TestGroupPrincipal
+
+  override def propertyOverrides(properties: Properties): Unit = {
+    
properties.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
+      classOf[GroupPrincipalBuilder].getName)
+    super.propertyOverrides(properties)
+  }
+}
\ No newline at end of file
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
new file mode 100644
index 00000000000..8dea0fcf15a
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import kafka.api.GroupEndToEndAuthorizationTest._
+import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext}
+
+object GroupEndToEndAuthorizationTest {
+  val GroupPrincipalType = "Group"
+  val ClientGroup = "testGroup"
+  class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      context match {
+        case ctx: SaslAuthenticationContext =>
+          if (ctx.server.getAuthorizationID == JaasTestUtils.KafkaScramUser)
+            new KafkaPrincipal(GroupPrincipalType, ClientGroup)
+          else
+            new KafkaPrincipal(GroupPrincipalType, 
ctx.server.getAuthorizationID)
+        case _ =>
+          KafkaPrincipal.ANONYMOUS
+      }
+    }
+  }
+}
+
+class GroupEndToEndAuthorizationTest extends 
SaslScramSslEndToEndAuthorizationTest {
+  override val kafkaPrincipalType = GroupPrincipalType
+  override val clientPrincipal = ClientGroup
+  
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
 classOf[GroupPrincipalBuilder].getName)
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index d304ffc1cae..dd1e705b030 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -28,7 +28,6 @@ class SaslScramSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
   override protected def kafkaServerSaslMechanisms = 
ScramMechanism.mechanismNames.asScala.toList
   override val clientPrincipal = JaasTestUtils.KafkaScramUser
   override val kafkaPrincipal = JaasTestUtils.KafkaScramAdmin
-  private val clientPassword = JaasTestUtils.KafkaScramPassword
   private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
 
   override def configureSecurityBeforeServersStart() {
@@ -42,7 +41,7 @@ class SaslScramSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
   override def setUp() {
     super.setUp()
     // Create client credentials after starting brokers so that dynamic 
credential creation is also tested
-    createScramCredentials(zkConnect, clientPrincipal, clientPassword)
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
     createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2)
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add authorization tests for custom principal types
> --------------------------------------------------
>
>                 Key: KAFKA-6912
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6912
>             Project: Kafka
>          Issue Type: Task
>          Components: core
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 2.0.0
>
>
> KIP-290 proposes to add prefixed-wildcarded principals to enable ACLs to be 
> configured for groups of principals. This doesn't work with all security 
> protocols - e.g. SSL principals are of format CN=name,O=org,C=country where 
> prefixes don't fit in terms of grouping. Kafka currently doesn't support the 
> concept of user groups, but it is possible to use custom 
> KafkaPrincipalBuilders to generate group principals during authentication. By 
> default, Kafka generates principals of type User, but custom types (e.g. 
> Group) are supported. This does currently have the restriction ACLs may be 
> defined only at group level (cannot combine both user & group level ACLs for 
> a connection), but it works currently for all security protocols.
> We don't have any tests that verify custom principal types and authorization 
> based on custom principal types. It will be good to add some tests.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to