[ 
https://issues.apache.org/jira/browse/KAFKA-6726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503117#comment-16503117
 ] 

ASF GitHub Bot commented on KAFKA-6726:
---------------------------------------

ijuma closed pull request #4795: KAFKA-6726: Fine Grained ACL for CreateTopics 
(KIP-277)
URL: https://github.com/apache/kafka/pull/4795
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index b1504b1acdf..54d33a526a7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -73,7 +73,7 @@
      *
      * REQUEST_TIMED_OUT(7)
      * INVALID_TOPIC_EXCEPTION(17)
-     * CLUSTER_AUTHORIZATION_FAILED(31)
+     * TOPIC_AUTHORIZATION_FAILED(29)
      * TOPIC_ALREADY_EXISTS(36)
      * INVALID_PARTITIONS(37)
      * INVALID_REPLICATION_FACTOR(38)
@@ -81,6 +81,7 @@
      * INVALID_CONFIG(40)
      * NOT_CONTROLLER(41)
      * INVALID_REQUEST(42)
+     * POLICY_VIOLATION(44)
      */
 
     private final Map<String, ApiError> errors;
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala 
b/core/src/main/scala/kafka/admin/AclCommand.scala
index 6dd227223e0..4409a187ae2 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -31,7 +31,7 @@ object AclCommand extends Logging {
 
   val Newline = scala.util.Properties.lineSeparator
   val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
-    Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, 
All),
+    Topic -> Set(Read, Write, Create, Describe, Delete, DescribeConfigs, 
AlterConfigs, All),
     Group -> Set(Read, Describe, Delete, All),
     Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, 
IdempotentWrite, Alter, Describe, All),
     TransactionalId -> Set(Describe, Write, All),
@@ -153,13 +153,16 @@ object AclCommand extends Logging {
     val transactionalIds: Set[Resource] = 
getResource(opts).filter(_.resourceType == TransactionalId)
     val enableIdempotence = opts.options.has(opts.idempotentOpt)
 
-    val acls = getAcl(opts, Set(Write, Describe))
+    val topicAcls = getAcl(opts, Set(Write, Describe, Create))
+    val transactionalIdAcls = getAcl(opts, Set(Write, Describe))
 
-    //Write, Describe permission on topics, Create permission on cluster, 
Write, Describe on transactionalIds
-    topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++
-      transactionalIds.map(_ -> acls).toMap[Resource, Set[Acl]] +
-      (Resource.ClusterResource -> (getAcl(opts, Set(Create)) ++
-        (if (enableIdempotence) getAcl(opts, Set(IdempotentWrite)) else 
Set.empty[Acl])))
+    //Write, Describe, Create permission on topics, Write, Describe on 
transactionalIds
+    topics.map(_ -> topicAcls).toMap ++
+      transactionalIds.map(_ -> transactionalIdAcls).toMap ++
+        (if (enableIdempotence) 
+          Map(Resource.ClusterResource -> getAcl(opts, Set(IdempotentWrite))) 
+        else
+          Map.empty)
   }
 
   private def getConsumerResourceToAcls(opts: AclCommandOptions): 
Map[Resource, Set[Acl]] = {
@@ -168,12 +171,12 @@ object AclCommand extends Logging {
     val topics: Set[Resource] = getResource(opts).filter(_.resourceType == 
Topic)
     val groups: Set[Resource] = resources.filter(_.resourceType == Group)
 
-    //Read,Describe on topic, Read on consumerGroup + Create on cluster
+    //Read, Describe on topic, Read on consumerGroup
 
     val acls = getAcl(opts, Set(Read, Describe))
 
-    topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++
-      groups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]]
+    topics.map(_ -> acls).toMap ++
+      groups.map(_ -> getAcl(opts, Set(Read))).toMap
   }
 
   private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, 
Set[Acl]] = {
@@ -355,7 +358,7 @@ object AclCommand extends Logging {
       .ofType(classOf[String])
 
     val producerOpt = parser.accepts("producer", "Convenience option to 
add/remove ACLs for producer role. " +
-      "This will generate ACLs that allows WRITE,DESCRIBE on topic and CREATE 
on cluster. ")
+      "This will generate ACLs that allows WRITE,DESCRIBE and CREATE on 
topic.")
 
     val consumerOpt = parser.accepts("consumer", "Convenience option to 
add/remove ACLs for consumer role. " +
       "This will generate ACLs that allows READ,DESCRIBE on topic and READ on 
group.")
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9f1ab62f03d..98672c8287a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -62,6 +62,7 @@ import scala.collection.JavaConverters._
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
+import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 
 /**
  * Logic to handle the various Kafka requests
@@ -1040,8 +1041,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       val nonExistingTopics = 
metadataCache.getNonExistingTopics(authorizedTopics)
       if (metadataRequest.allowAutoTopicCreation && 
config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
         if (!authorize(request.session, Create, Resource.ClusterResource)) {
-          authorizedTopics --= nonExistingTopics
-          unauthorizedForCreateTopics ++= nonExistingTopics
+          unauthorizedForCreateTopics = nonExistingTopics.filter { topic =>
+            !authorize(request.session, Create, new Resource(Topic, topic))
+          }
+          authorizedTopics --= unauthorizedForCreateTopics
         }
       }
     }
@@ -1424,16 +1427,20 @@ class KafkaApis(val requestChannel: RequestChannel,
         (topic, new ApiError(Errors.NOT_CONTROLLER, null))
       }
       sendResponseCallback(results)
-    } else if (!authorize(request.session, Create, Resource.ClusterResource)) {
-      val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
-        (topic, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null))
-      }
-      sendResponseCallback(results)
     } else {
       val (validTopics, duplicateTopics) = 
createTopicsRequest.topics.asScala.partition { case (topic, _) =>
         !createTopicsRequest.duplicateTopics.contains(topic)
       }
 
+      val (authorizedTopics, unauthorizedTopics) =
+        if (authorize(request.session, Create, Resource.ClusterResource)) {
+          (validTopics, Map[String, TopicDetails]())
+        } else {
+          validTopics.partition { case (topic, _) =>
+            authorize(request.session, Create, new Resource(Topic, topic))
+          }
+        }
+
       // Special handling to add duplicate topics to the response
       def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): 
Unit = {
 
@@ -1447,14 +1454,15 @@ class KafkaApis(val requestChannel: RequestChannel,
             duplicateTopics.keySet.map((_, new 
ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
           } else Map.empty
 
-        val completeResults = results ++ duplicatedTopicsResults
+        val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new 
ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) 
+        val completeResults = results ++ duplicatedTopicsResults ++ 
unauthorizedTopicsResults
         sendResponseCallback(completeResults)
       }
 
       adminManager.createTopics(
         createTopicsRequest.timeout,
         createTopicsRequest.validateOnly,
-        validTopics,
+        authorizedTopics,
         sendResponseWithDuplicatesCallback
       )
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 4af9b83f3b8..ea5a155b5dc 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -17,6 +17,7 @@ import java.util
 import java.util.concurrent.ExecutionException
 import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
+import java.time.Duration
 
 import kafka.admin.AdminClient
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, 
KafkaConsumerGroupService}
@@ -73,6 +74,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val groupResource = new Resource(Group, group)
   val deleteTopicResource = new Resource(Topic, deleteTopic)
   val transactionalIdResource = new Resource(TransactionalId, transactionalId)
+  val createTopicResource = new Resource(Topic, createTopic)
 
   val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, 
Acl.WildCardHost, Read)))
   val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, 
Allow, Acl.WildCardHost, Describe)))
@@ -82,6 +84,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new 
Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)))
   val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new 
Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
   val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new 
Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)))
+  val topicCreateAcl = Map(createTopicResource -> Set(new Acl(userPrincipal, 
Allow, Acl.WildCardHost, Create)))
   val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, 
Acl.WildCardHost, Read)))
   val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, 
Acl.WildCardHost, Write)))
   val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, 
Allow, Acl.WildCardHost, Describe)))
@@ -207,7 +210,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.LEADER_AND_ISR -> clusterAcl,
     ApiKeys.STOP_REPLICA -> clusterAcl,
     ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl,
-    ApiKeys.CREATE_TOPICS -> clusterCreateAcl,
+    ApiKeys.CREATE_TOPICS -> topicCreateAcl,
     ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
     ApiKeys.DELETE_RECORDS -> topicDeleteAcl,
     ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
@@ -492,6 +495,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     }
   }
 
+  @Test
+  def testCreateTopicAuthorizationWithClusterCreate() {
+    removeAllAcls()
+    val resources = Set[ResourceType](Topic)
+
+    sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, 
createTopicsRequest, resources, isAuthorized = false)
+
+    for ((resource, acls) <- clusterCreateAcl)
+      addAndVerifyAcls(acls, resource)
+    sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, 
createTopicsRequest, resources, isAuthorized = true)
+  }
+
   @Test
   def testFetchFollowerRequest() {
     val key = ApiKeys.FETCH
@@ -551,18 +566,30 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
-  def testCreatePermissionNeededForWritingToNonExistentTopic() {
-    val newTopic = "newTopic"
-    val topicPartition = new TopicPartition(newTopic, 0)
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), new Resource(Topic, newTopic))
+  def testCreatePermissionOnTopicToWriteToNonExistentTopic() {
+    testCreatePermissionNeededToWriteToNonExistentTopic(Topic)
+  }
+
+  @Test
+  def testCreatePermissionOnClusterToWriteToNonExistentTopic() {
+    testCreatePermissionNeededToWriteToNonExistentTopic(Cluster)
+  }
+
+  private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: 
ResourceType) {
+    val topicPartition = new TopicPartition(createTopic, 0)
+    val newTopicResource = new Resource(Topic, createTopic)
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), newTopicResource)
     try {
       sendRecords(numRecords, topicPartition)
       Assert.fail("should have thrown exception")
     } catch {
-      case e: TopicAuthorizationException => 
assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
+      case e: TopicAuthorizationException => 
+        assertEquals(Collections.singleton(createTopic), 
e.unauthorizedTopics())
     }
 
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Create)), Resource.ClusterResource)
+    val resource = if (resType == Topic) newTopicResource else 
Resource.ClusterResource
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Create)), resource)
+
     sendRecords(numRecords, topicPartition)
   }
 
@@ -800,27 +827,37 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 }
 
   @Test
-  def testCreatePermissionNeededToReadFromNonExistentTopic() {
-    val newTopic = "newTopic"
+  def testCreatePermissionOnTopicToReadFromNonExistentTopic() {
+    testCreatePermissionNeededToReadFromNonExistentTopic("newTopic",
+      Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)),
+      Topic)
+  }
+
+  @Test
+  def testCreatePermissionOnClusterToReadFromNonExistentTopic() {
+    testCreatePermissionNeededToReadFromNonExistentTopic("newTopic",
+      Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), 
+      Cluster)
+  }
+
+  private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: 
String, acls: Set[Acl], resType: ResourceType) {
     val topicPartition = new TopicPartition(newTopic, 0)
     val newTopicResource = new Resource(Topic, newTopic)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), newTopicResource)
     addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
-    addAndVerifyAcls(clusterAcl(Resource.ClusterResource), 
Resource.ClusterResource)
-    try {
-      this.consumers.head.assign(List(topicPartition).asJava)
-      consumeRecords(this.consumers.head)
-      Assert.fail("should have thrown exception")
-    } catch {
-      case e: TopicAuthorizationException =>
-        assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
-    }
+    this.consumers.head.assign(List(topicPartition).asJava)
+    val unauthorizedTopics = intercept[TopicAuthorizationException] {
+      (0 until 10).foreach(_ => consumers.head.poll(Duration.ofMillis(50L)))
+    }.unauthorizedTopics
+    assertEquals(Collections.singleton(newTopic), unauthorizedTopics)
 
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Write)), newTopicResource)
-    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Create)), Resource.ClusterResource)
+    val resource = if (resType == Topic) newTopicResource else 
Resource.ClusterResource
+    addAndVerifyAcls(acls, resource)
 
-    sendRecords(numRecords, topicPartition)
-    consumeRecords(this.consumers.head, topic = newTopic, part = 0)
+    TestUtils.waitUntilTrue(() => {
+      this.consumers.head.poll(Duration.ofMillis(50L))
+      this.zkClient.topicExists(newTopic)
+    }, "Expected topic was not created")
   }
 
   @Test(expected = classOf[AuthorizationException])
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 4189ce3610b..c81b32d121b 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -60,7 +60,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   override val serverCount = 3
 
   override def configureSecurityBeforeServersStart() {
-    AclCommand.main(clusterAclArgs)
+    AclCommand.main(clusterActionArgs)
     AclCommand.main(topicBrokerReadAclArgs)
   }
 
@@ -82,23 +82,20 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   val wildcardTopicResource = new Resource(Topic, wildcard)
   val wildcardGroupResource = new Resource(Group, wildcard)
 
-  // Arguments to AclCommand to set ACLs. There are three definitions here:
-  // 1- Provides read and write access to topic
-  // 2- Provides only write access to topic
-  // 3- Provides read access to consumer group
-  def clusterAclArgs: Array[String] = Array("--authorizer-properties",
-                                            s"zookeeper.connect=$zkConnect",
-                                            s"--add",
-                                            s"--cluster",
-                                            s"--operation=ClusterAction",
-                                            
s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
+  // Arguments to AclCommand to set ACLs.
+  def clusterActionArgs: Array[String] = Array("--authorizer-properties",
+                                          s"zookeeper.connect=$zkConnect",
+                                          s"--add",
+                                          s"--cluster",
+                                          s"--operation=ClusterAction",
+                                          
s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
   def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
-                                                    
s"zookeeper.connect=$zkConnect",
-                                                    s"--add",
-                                                    s"--topic=$wildcard",
-                                                    s"--operation=Read",
-                                                    
s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
-  def produceAclArgs: Array[String] = Array("--authorizer-properties",
+                                          s"zookeeper.connect=$zkConnect",
+                                          s"--add",
+                                          s"--topic=$wildcard",
+                                          s"--operation=Read",
+                                          
s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
+  def produceAclArgs(topic: String): Array[String] = 
Array("--authorizer-properties",
                                           s"zookeeper.connect=$zkConnect",
                                           s"--add",
                                           s"--topic=$topic",
@@ -124,13 +121,13 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
                                           s"--topic=$topic",
                                           s"--operation=Write",
                                           
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
-  def consumeAclArgs: Array[String] = Array("--authorizer-properties",
-                                               s"zookeeper.connect=$zkConnect",
-                                               s"--add",
-                                               s"--topic=$topic",
-                                               s"--group=$group",
-                                               s"--consumer",
-                                               
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+  def consumeAclArgs(topic: String): Array[String] = 
Array("--authorizer-properties",
+                                          s"zookeeper.connect=$zkConnect",
+                                          s"--add",
+                                          s"--topic=$topic",
+                                          s"--group=$group",
+                                          s"--consumer",
+                                          
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
   def groupAclArgs: Array[String] = Array("--authorizer-properties",
                                           s"zookeeper.connect=$zkConnect",
                                           s"--add",
@@ -138,13 +135,13 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
                                           s"--operation=Read",
                                           
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
   def produceConsumeWildcardAclArgs: Array[String] = 
Array("--authorizer-properties",
-                                            s"zookeeper.connect=$zkConnect",
-                                            s"--add",
-                                            s"--topic=$wildcard",
-                                            s"--group=$wildcard",
-                                            s"--consumer",
-                                            s"--producer",
-                                            
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+                                          s"zookeeper.connect=$zkConnect",
+                                          s"--add",
+                                          s"--topic=$wildcard",
+                                          s"--group=$wildcard",
+                                          s"--consumer",
+                                          s"--producer",
+                                          
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
 
   def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, 
kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction))
   def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, 
kafkaPrincipal), Allow, Acl.WildCardHost, Read))
@@ -152,6 +149,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, 
clientPrincipal), Allow, Acl.WildCardHost, Read))
   def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, 
clientPrincipal), Allow, Acl.WildCardHost, Write))
   def TopicDescribeAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, 
clientPrincipal), Allow, Acl.WildCardHost, Describe))
+  def TopicCreateAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, 
clientPrincipal), Allow, Acl.WildCardHost, Create))
   // The next two configuration parameters enable ZooKeeper secure ACLs
   // and sets the Kafka authorizer, both necessary to enable security.
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
@@ -160,6 +158,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, 
"3")
   this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3")
+  this.serverConfig.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group")
 
   /**
@@ -200,14 +199,14 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     */
   @Test
   def testProduceConsumeViaAssign(): Unit = {
-    setAclsAndProduce()
+    setAclsAndProduce(tp)
     consumers.head.assign(List(tp).asJava)
     consumeRecords(this.consumers.head, numRecords)
   }
 
   @Test
   def testProduceConsumeViaSubscribe(): Unit = {
-    setAclsAndProduce()
+    setAclsAndProduce(tp)
     consumers.head.subscribe(List(topic).asJava)
     consumeRecords(this.consumers.head, numRecords)
   }
@@ -223,16 +222,25 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   private def setWildcardResourceAcls() {
     AclCommand.main(produceConsumeWildcardAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, 
wildcardTopicResource)
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, 
s.apis.authorizer.get, wildcardTopicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
wildcardGroupResource)
     }
   }
 
-  protected def setAclsAndProduce() {
-    AclCommand.main(produceAclArgs)
-    AclCommand.main(consumeAclArgs)
+  @Test
+  def testProduceConsumeTopicAutoCreateTopicCreateAcl(): Unit = {
+    // topic2 is not created on setup()
+    val tp2 = new TopicPartition("topic2", 0)
+    setAclsAndProduce(tp2)
+    consumers.head.assign(List(tp2).asJava)
+    consumeRecords(this.consumers.head, numRecords, topic = tp2.topic)
+  }
+
+  protected def setAclsAndProduce(tp: TopicPartition) {
+    AclCommand.main(produceAclArgs(tp.topic))
+    AclCommand.main(consumeAclArgs(tp.topic))
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, new Resource(Topic, 
tp.topic))
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
groupResource)
     }
     sendRecords(numRecords, tp)
@@ -283,10 +291,10 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   }
   
   private def noConsumeWithoutDescribeAclSetup(): Unit = {
-    AclCommand.main(produceAclArgs)
+    AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, 
s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ 
TopicCreateAcl, s.apis.authorizer.get, topicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
groupResource)
     }
 
@@ -328,10 +336,10 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   }
   
   private def noConsumeWithDescribeAclSetup(): Unit = {
-    AclCommand.main(produceAclArgs)
+    AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, 
s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ 
TopicCreateAcl, s.apis.authorizer.get, topicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
groupResource)
     }
     sendRecords(numRecords, tp)
@@ -343,9 +351,9 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     */
   @Test
   def testNoGroupAcl(): Unit = {
-    AclCommand.main(produceAclArgs)
+    AclCommand.main(produceAclArgs(tp.topic))
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, 
s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ 
TopicCreateAcl, s.apis.authorizer.get, topicResource)
     }
     sendRecords(numRecords, tp)
     consumers.head.assign(List(tp).asJava)
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index a5bf33171a4..643cd4ce6af 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -56,7 +56,7 @@ abstract class SaslEndToEndAuthorizationTest extends 
EndToEndAuthorizationTest {
     */
   @Test(timeout = 15000)
   def testTwoConsumersWithDifferentSaslCredentials(): Unit = {
-    setAclsAndProduce()
+    setAclsAndProduce(tp)
     val consumer1 = consumers.head
 
     val consumer2Config = new Properties
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 9b6272860ed..9197f79882f 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -49,8 +49,8 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
   )
 
   private val ResourceToOperations = Map[Set[Resource], (Set[Operation], 
Array[String])](
-    TopicResources -> (Set(Read, Write, Describe, Delete, DescribeConfigs, 
AlterConfigs),
-      Array("--operation", "Read" , "--operation", "Write", "--operation", 
"Describe", "--operation", "Delete",
+    TopicResources -> (Set(Read, Write, Create, Describe, Delete, 
DescribeConfigs, AlterConfigs),
+      Array("--operation", "Read" , "--operation", "Write", "--operation", 
"Create", "--operation", "Describe", "--operation", "Delete",
         "--operation", "DescribeConfigs", "--operation", "AlterConfigs")),
     Set(Resource.ClusterResource) -> (Set(Create, ClusterAction, 
DescribeConfigs, AlterConfigs, IdempotentWrite),
       Array("--operation", "Create", "--operation", "ClusterAction", 
"--operation", "DescribeConfigs",
@@ -61,10 +61,10 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
   )
 
   private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = 
Map[Set[Resource], Set[Acl]](
-    TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), 
Hosts),
+    TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe, 
Create), Hosts),
     TransactionalIdResources -> AclCommand.getAcls(Users, Allow, Set(Write, 
Describe), Hosts),
-    Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, 
Set(Some(Create),
-      if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts)
+    Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, 
+      Set(if (enableIdempotence) Some(IdempotentWrite) else None).flatten, 
Hosts)
   )
 
   private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]](
diff --git a/docs/security.html b/docs/security.html
index 06dd8fbd1fc..0ef37d75e92 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1133,7 +1133,7 @@ <h4><a id="security_authz_cli" 
href="#security_authz_cli">Command Line Interface
         <tr>
             <td>--producer</td>
             <td> Convenience option to add/remove acls for producer role. This 
will generate acls that allows WRITE,
-                DESCRIBE on topic and CREATE on cluster.</td>
+                DESCRIBE and CREATE on topic.</td>
             <td></td>
             <td>Convenience</td>
         </tr>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 532c8bc5728..4f1c5b34767 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -98,13 +98,10 @@ <h5><a id="upgrade_200_notable" 
href="#upgrade_200_notable">Notable changes in 2
         will be removed in a future version.</li>
     <li>The internal method 
<code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. 
Users are encouraged to migrate to 
<code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
     <li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
-    <li><a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools";>KIP-176</a>
 finally removes
-        the <code>--new-consumer</code> option for all consumer based tools as 
<code>kafka-console-consumer</code>, <code>kafka-consumer-perf-test</code>
-        and <code>kafka-consumer-groups</code>.
-        The new consumer is automatically used if the bootstrap servers list 
is provided on the command line
-        otherwise, when the zookeeper connection is provided, the old consumer 
is used.
-        The <code>--new-consumer</code> option had already been ignored as the 
way of selecting the consumer since Kafka 1.0.0,
-        this KIP just removes the option.
+    <li>The AclCommand tool <code>--producer</code> convenience option uses 
the <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API";>KIP-277</a>
 finer grained ACL on the given topic. </li>
+    <li><a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools";>KIP-176</a>
 removes
+        the <code>--new-consumer</code> option for all consumer based tools. 
This option is redundant since the new consumer is automatically
+        used if --bootstrap-server is defined.
     </li>
 </ul>
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-277 - Fine Grained ACL for CreateTopics API
> -----------------------------------------------
>
>                 Key: KAFKA-6726
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6726
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core, tools
>            Reporter: Edoardo Comar
>            Assignee: Edoardo Comar
>            Priority: Major
>
> issue to track implementation of 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to