[ https://issues.apache.org/jira/browse/KAFKA-6726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503117#comment-16503117 ]
ASF GitHub Bot commented on KAFKA-6726: --------------------------------------- ijuma closed pull request #4795: KAFKA-6726: Fine Grained ACL for CreateTopics (KIP-277) URL: https://github.com/apache/kafka/pull/4795 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java index b1504b1acdf..54d33a526a7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java @@ -73,7 +73,7 @@ * * REQUEST_TIMED_OUT(7) * INVALID_TOPIC_EXCEPTION(17) - * CLUSTER_AUTHORIZATION_FAILED(31) + * TOPIC_AUTHORIZATION_FAILED(29) * TOPIC_ALREADY_EXISTS(36) * INVALID_PARTITIONS(37) * INVALID_REPLICATION_FACTOR(38) @@ -81,6 +81,7 @@ * INVALID_CONFIG(40) * NOT_CONTROLLER(41) * INVALID_REQUEST(42) + * POLICY_VIOLATION(44) */ private final Map<String, ApiError> errors; diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 6dd227223e0..4409a187ae2 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -31,7 +31,7 @@ object AclCommand extends Logging { val Newline = scala.util.Properties.lineSeparator val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] ( - Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All), + Topic -> Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs, All), Group -> Set(Read, Describe, Delete, All), Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All), TransactionalId -> Set(Describe, Write, All), @@ -153,13 +153,16 @@ object AclCommand extends Logging { val transactionalIds: Set[Resource] = getResource(opts).filter(_.resourceType == TransactionalId) val enableIdempotence = opts.options.has(opts.idempotentOpt) - val acls = getAcl(opts, Set(Write, Describe)) + val topicAcls = getAcl(opts, Set(Write, Describe, Create)) + val transactionalIdAcls = getAcl(opts, Set(Write, Describe)) - //Write, Describe permission on topics, Create permission on cluster, Write, Describe on transactionalIds - topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++ - transactionalIds.map(_ -> acls).toMap[Resource, Set[Acl]] + - (Resource.ClusterResource -> (getAcl(opts, Set(Create)) ++ - (if (enableIdempotence) getAcl(opts, Set(IdempotentWrite)) else Set.empty[Acl]))) + //Write, Describe, Create permission on topics, Write, Describe on transactionalIds + topics.map(_ -> topicAcls).toMap ++ + transactionalIds.map(_ -> transactionalIdAcls).toMap ++ + (if (enableIdempotence) + Map(Resource.ClusterResource -> getAcl(opts, Set(IdempotentWrite))) + else + Map.empty) } private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { @@ -168,12 +171,12 @@ object AclCommand extends Logging { val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic) val groups: Set[Resource] = resources.filter(_.resourceType == Group) - //Read,Describe on topic, Read on consumerGroup + Create on cluster + //Read, Describe on topic, Read on consumerGroup val acls = getAcl(opts, Set(Read, Describe)) - topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++ - groups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]] + topics.map(_ -> acls).toMap ++ + groups.map(_ -> getAcl(opts, Set(Read))).toMap } private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { @@ -355,7 +358,7 @@ object AclCommand extends Logging { .ofType(classOf[String]) val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " + - "This will generate ACLs that allows WRITE,DESCRIBE on topic and CREATE on cluster. ") + "This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.") val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " + "This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.") diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9f1ab62f03d..98672c8287a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -62,6 +62,7 @@ import scala.collection.JavaConverters._ import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} +import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails /** * Logic to handle the various Kafka requests @@ -1040,8 +1041,10 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { if (!authorize(request.session, Create, Resource.ClusterResource)) { - authorizedTopics --= nonExistingTopics - unauthorizedForCreateTopics ++= nonExistingTopics + unauthorizedForCreateTopics = nonExistingTopics.filter { topic => + !authorize(request.session, Create, new Resource(Topic, topic)) + } + authorizedTopics --= unauthorizedForCreateTopics } } } @@ -1424,16 +1427,20 @@ class KafkaApis(val requestChannel: RequestChannel, (topic, new ApiError(Errors.NOT_CONTROLLER, null)) } sendResponseCallback(results) - } else if (!authorize(request.session, Create, Resource.ClusterResource)) { - val results = createTopicsRequest.topics.asScala.map { case (topic, _) => - (topic, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)) - } - sendResponseCallback(results) } else { val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) => !createTopicsRequest.duplicateTopics.contains(topic) } + val (authorizedTopics, unauthorizedTopics) = + if (authorize(request.session, Create, Resource.ClusterResource)) { + (validTopics, Map[String, TopicDetails]()) + } else { + validTopics.partition { case (topic, _) => + authorize(request.session, Create, new Resource(Topic, topic)) + } + } + // Special handling to add duplicate topics to the response def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): Unit = { @@ -1447,14 +1454,15 @@ class KafkaApis(val requestChannel: RequestChannel, duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap } else Map.empty - val completeResults = results ++ duplicatedTopicsResults + val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) + val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults sendResponseCallback(completeResults) } adminManager.createTopics( createTopicsRequest.timeout, createTopicsRequest.validateOnly, - validTopics, + authorizedTopics, sendResponseWithDuplicatesCallback ) } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4af9b83f3b8..ea5a155b5dc 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -17,6 +17,7 @@ import java.util import java.util.concurrent.ExecutionException import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} +import java.time.Duration import kafka.admin.AdminClient import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService} @@ -73,6 +74,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val groupResource = new Resource(Group, group) val deleteTopicResource = new Resource(Topic, deleteTopic) val transactionalIdResource = new Resource(TransactionalId, transactionalId) + val createTopicResource = new Resource(Topic, createTopic) val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) @@ -82,6 +84,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter))) val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite))) + val topicCreateAcl = Map(createTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create))) val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write))) val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) @@ -207,7 +210,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEADER_AND_ISR -> clusterAcl, ApiKeys.STOP_REPLICA -> clusterAcl, ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl, - ApiKeys.CREATE_TOPICS -> clusterCreateAcl, + ApiKeys.CREATE_TOPICS -> topicCreateAcl, ApiKeys.DELETE_TOPICS -> topicDeleteAcl, ApiKeys.DELETE_RECORDS -> topicDeleteAcl, ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl, @@ -492,6 +495,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + @Test + def testCreateTopicAuthorizationWithClusterCreate() { + removeAllAcls() + val resources = Set[ResourceType](Topic) + + sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = false) + + for ((resource, acls) <- clusterCreateAcl) + addAndVerifyAcls(acls, resource) + sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = true) + } + @Test def testFetchFollowerRequest() { val key = ApiKeys.FETCH @@ -551,18 +566,30 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testCreatePermissionNeededForWritingToNonExistentTopic() { - val newTopic = "newTopic" - val topicPartition = new TopicPartition(newTopic, 0) - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic)) + def testCreatePermissionOnTopicToWriteToNonExistentTopic() { + testCreatePermissionNeededToWriteToNonExistentTopic(Topic) + } + + @Test + def testCreatePermissionOnClusterToWriteToNonExistentTopic() { + testCreatePermissionNeededToWriteToNonExistentTopic(Cluster) + } + + private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: ResourceType) { + val topicPartition = new TopicPartition(createTopic, 0) + val newTopicResource = new Resource(Topic, createTopic) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource) try { sendRecords(numRecords, topicPartition) Assert.fail("should have thrown exception") } catch { - case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(createTopic), e.unauthorizedTopics()) } - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), resource) + sendRecords(numRecords, topicPartition) } @@ -800,27 +827,37 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testCreatePermissionNeededToReadFromNonExistentTopic() { - val newTopic = "newTopic" + def testCreatePermissionOnTopicToReadFromNonExistentTopic() { + testCreatePermissionNeededToReadFromNonExistentTopic("newTopic", + Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), + Topic) + } + + @Test + def testCreatePermissionOnClusterToReadFromNonExistentTopic() { + testCreatePermissionNeededToReadFromNonExistentTopic("newTopic", + Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), + Cluster) + } + + private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) { val topicPartition = new TopicPartition(newTopic, 0) val newTopicResource = new Resource(Topic, newTopic) addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource) addAndVerifyAcls(groupReadAcl(groupResource), groupResource) - addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource) - try { - this.consumers.head.assign(List(topicPartition).asJava) - consumeRecords(this.consumers.head) - Assert.fail("should have thrown exception") - } catch { - case e: TopicAuthorizationException => - assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) - } + this.consumers.head.assign(List(topicPartition).asJava) + val unauthorizedTopics = intercept[TopicAuthorizationException] { + (0 until 10).foreach(_ => consumers.head.poll(Duration.ofMillis(50L))) + }.unauthorizedTopics + assertEquals(Collections.singleton(newTopic), unauthorizedTopics) - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource) - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource + addAndVerifyAcls(acls, resource) - sendRecords(numRecords, topicPartition) - consumeRecords(this.consumers.head, topic = newTopic, part = 0) + TestUtils.waitUntilTrue(() => { + this.consumers.head.poll(Duration.ofMillis(50L)) + this.zkClient.topicExists(newTopic) + }, "Expected topic was not created") } @Test(expected = classOf[AuthorizationException]) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 4189ce3610b..c81b32d121b 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -60,7 +60,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas override val serverCount = 3 override def configureSecurityBeforeServersStart() { - AclCommand.main(clusterAclArgs) + AclCommand.main(clusterActionArgs) AclCommand.main(topicBrokerReadAclArgs) } @@ -82,23 +82,20 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas val wildcardTopicResource = new Resource(Topic, wildcard) val wildcardGroupResource = new Resource(Group, wildcard) - // Arguments to AclCommand to set ACLs. There are three definitions here: - // 1- Provides read and write access to topic - // 2- Provides only write access to topic - // 3- Provides read access to consumer group - def clusterAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=ClusterAction", - s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") + // Arguments to AclCommand to set ACLs. + def clusterActionArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--cluster", + s"--operation=ClusterAction", + s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--operation=Read", - s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") - def produceAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$wildcard", + s"--operation=Read", + s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") + def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", s"--topic=$topic", @@ -124,13 +121,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas s"--topic=$topic", s"--operation=Write", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") - def consumeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--group=$group", - s"--consumer", - s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$topic", + s"--group=$group", + s"--consumer", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def groupAclArgs: Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", @@ -138,13 +135,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas s"--operation=Read", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--group=$wildcard", - s"--consumer", - s"--producer", - s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$wildcard", + s"--group=$wildcard", + s"--consumer", + s"--producer", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction)) def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read)) @@ -152,6 +149,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Write)) def TopicDescribeAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Describe)) + def TopicCreateAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Create)) // The next two configuration parameters enable ZooKeeper secure ACLs // and sets the Kafka authorizer, both necessary to enable security. this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") @@ -160,6 +158,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3") + this.serverConfig.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group") /** @@ -200,14 +199,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @Test def testProduceConsumeViaAssign(): Unit = { - setAclsAndProduce() + setAclsAndProduce(tp) consumers.head.assign(List(tp).asJava) consumeRecords(this.consumers.head, numRecords) } @Test def testProduceConsumeViaSubscribe(): Unit = { - setAclsAndProduce() + setAclsAndProduce(tp) consumers.head.subscribe(List(topic).asJava) consumeRecords(this.consumers.head, numRecords) } @@ -223,16 +222,25 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas private def setWildcardResourceAcls() { AclCommand.main(produceConsumeWildcardAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource) } } - protected def setAclsAndProduce() { - AclCommand.main(produceAclArgs) - AclCommand.main(consumeAclArgs) + @Test + def testProduceConsumeTopicAutoCreateTopicCreateAcl(): Unit = { + // topic2 is not created on setup() + val tp2 = new TopicPartition("topic2", 0) + setAclsAndProduce(tp2) + consumers.head.assign(List(tp2).asJava) + consumeRecords(this.consumers.head, numRecords, topic = tp2.topic) + } + + protected def setAclsAndProduce(tp: TopicPartition) { + AclCommand.main(produceAclArgs(tp.topic)) + AclCommand.main(consumeAclArgs(tp.topic)) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, new Resource(Topic, tp.topic)) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } sendRecords(numRecords, tp) @@ -283,10 +291,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def noConsumeWithoutDescribeAclSetup(): Unit = { - AclCommand.main(produceAclArgs) + AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(groupAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } @@ -328,10 +336,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def noConsumeWithDescribeAclSetup(): Unit = { - AclCommand.main(produceAclArgs) + AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(groupAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } sendRecords(numRecords, tp) @@ -343,9 +351,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @Test def testNoGroupAcl(): Unit = { - AclCommand.main(produceAclArgs) + AclCommand.main(produceAclArgs(tp.topic)) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) } sendRecords(numRecords, tp) consumers.head.assign(List(tp).asJava) diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index a5bf33171a4..643cd4ce6af 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -56,7 +56,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { */ @Test(timeout = 15000) def testTwoConsumersWithDifferentSaslCredentials(): Unit = { - setAclsAndProduce() + setAclsAndProduce(tp) val consumer1 = consumers.head val consumer2Config = new Properties diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index 9b6272860ed..9197f79882f 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -49,8 +49,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { ) private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])]( - TopicResources -> (Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs), - Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete", + TopicResources -> (Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs), + Array("--operation", "Read" , "--operation", "Write", "--operation", "Create", "--operation", "Describe", "--operation", "Delete", "--operation", "DescribeConfigs", "--operation", "AlterConfigs")), Set(Resource.ClusterResource) -> (Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite), Array("--operation", "Create", "--operation", "ClusterAction", "--operation", "DescribeConfigs", @@ -61,10 +61,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { ) private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]]( - TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts), + TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe, Create), Hosts), TransactionalIdResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts), - Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, Set(Some(Create), - if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts) + Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, + Set(if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts) ) private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]]( diff --git a/docs/security.html b/docs/security.html index 06dd8fbd1fc..0ef37d75e92 100644 --- a/docs/security.html +++ b/docs/security.html @@ -1133,7 +1133,7 @@ <h4><a id="security_authz_cli" href="#security_authz_cli">Command Line Interface <tr> <td>--producer</td> <td> Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, - DESCRIBE on topic and CREATE on cluster.</td> + DESCRIBE and CREATE on topic.</td> <td></td> <td>Convenience</td> </tr> diff --git a/docs/upgrade.html b/docs/upgrade.html index 532c8bc5728..4f1c5b34767 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -98,13 +98,10 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2 will be removed in a future version.</li> <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li> <li>The tool kafka.tools.ReplayLogProducer has been removed.</li> - <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a> finally removes - the <code>--new-consumer</code> option for all consumer based tools as <code>kafka-console-consumer</code>, <code>kafka-consumer-perf-test</code> - and <code>kafka-consumer-groups</code>. - The new consumer is automatically used if the bootstrap servers list is provided on the command line - otherwise, when the zookeeper connection is provided, the old consumer is used. - The <code>--new-consumer</code> option had already been ignored as the way of selecting the consumer since Kafka 1.0.0, - this KIP just removes the option. + <li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li> + <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a> removes + the <code>--new-consumer</code> option for all consumer based tools. This option is redundant since the new consumer is automatically + used if --bootstrap-server is defined. </li> </ul> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-277 - Fine Grained ACL for CreateTopics API > ----------------------------------------------- > > Key: KAFKA-6726 > URL: https://issues.apache.org/jira/browse/KAFKA-6726 > Project: Kafka > Issue Type: Improvement > Components: core, tools > Reporter: Edoardo Comar > Assignee: Edoardo Comar > Priority: Major > > issue to track implementation of > https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API -- This message was sent by Atlassian JIRA (v7.6.3#76005)