Repository: kafka Updated Branches: refs/heads/trunk 4065ffb3e -> 3410f02fe
KAFKA-4585: Lower the Minimum Required ACL Permission of OffsetFetch (KIP-163) Details can be found in the [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-163%3A+Lower+the+Minimum+Required+ACL+Permission+of+OffsetFetch). Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Ewen Cheslack-Postava <e...@confluent.io> Closes #3661 from vahidhashemian/KAFKA-4585 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3410f02f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3410f02f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3410f02f Branch: refs/heads/trunk Commit: 3410f02fec6bc8633ebb58e3c1a8f9c2e49a1aa4 Parents: 4065ffb Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Thu Sep 7 11:40:54 2017 -0700 Committer: Ewen Cheslack-Postava <m...@ewencp.org> Committed: Thu Sep 7 11:40:54 2017 -0700 ---------------------------------------------------------------------- .../src/main/scala/kafka/server/KafkaApis.scala | 2 +- .../kafka/api/AuthorizerIntegrationTest.scala | 47 +++++++++++++++++--- 2 files changed, 43 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3410f02f/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 17c3f2d..4fd906f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -989,7 +989,7 @@ class KafkaApis(val requestChannel: RequestChannel, def createResponse(requestThrottleMs: Int): AbstractResponse = { val offsetFetchResponse = // reject the request if not authorized to the group - if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) + if (!authorize(request.session, Describe, new Resource(Group, offsetFetchRequest.groupId))) offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED) else { if (header.apiVersion == 0) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3410f02f/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index ccb2719..dcd2038 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -18,13 +18,17 @@ import java.util.concurrent.ExecutionException import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} +import kafka.admin.AdminClient +import kafka.admin.AdminUtils +import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions +import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService import kafka.common.TopicAndPartition +import kafka.log.LogConfig +import kafka.network.SocketServer import kafka.security.auth._ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils -import kafka.admin.AdminUtils -import kafka.log.LogConfig -import kafka.network.SocketServer + import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.consumer._ @@ -116,6 +120,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.UPDATE_METADATA_KEY -> classOf[requests.UpdateMetadataResponse], ApiKeys.JOIN_GROUP -> classOf[JoinGroupResponse], ApiKeys.SYNC_GROUP -> classOf[SyncGroupResponse], + ApiKeys.DESCRIBE_GROUPS -> classOf[DescribeGroupsResponse], ApiKeys.HEARTBEAT -> classOf[HeartbeatResponse], ApiKeys.LEAVE_GROUP -> classOf[LeaveGroupResponse], ApiKeys.LEADER_AND_ISR -> classOf[requests.LeaderAndIsrResponse], @@ -150,6 +155,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.error), ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error), ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error), + ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => resp.groups.get(group).error), ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error), ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error), ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2), @@ -182,11 +188,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.FETCH -> topicReadAcl, ApiKeys.LIST_OFFSETS -> topicDescribeAcl, ApiKeys.OFFSET_COMMIT -> (topicReadAcl ++ groupReadAcl), - ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupReadAcl), + ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupDescribeAcl), ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupDescribeAcl ++ transactionalIdDescribeAcl), ApiKeys.UPDATE_METADATA_KEY -> clusterAcl, ApiKeys.JOIN_GROUP -> groupReadAcl, ApiKeys.SYNC_GROUP -> groupReadAcl, + ApiKeys.DESCRIBE_GROUPS -> groupDescribeAcl, ApiKeys.HEARTBEAT -> groupReadAcl, ApiKeys.LEAVE_GROUP -> groupReadAcl, ApiKeys.LEADER_AND_ISR -> clusterAcl, @@ -300,6 +307,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { new SyncGroupRequest.Builder(group, 1, "", Map[String, ByteBuffer]().asJava).build() } + private def createDescribeGroupsRequest = { + new DescribeGroupsRequest.Builder(List(group).asJava).build() + } + private def createOffsetCommitRequest = { new requests.OffsetCommitRequest.Builder( group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava). @@ -365,6 +376,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest, ApiKeys.JOIN_GROUP -> createJoinGroupRequest, ApiKeys.SYNC_GROUP -> createSyncGroupRequest, + ApiKeys.DESCRIBE_GROUPS -> createDescribeGroupsRequest, ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, ApiKeys.HEARTBEAT -> heartbeatRequest, ApiKeys.LEAVE_GROUP -> leaveGroupRequest, @@ -845,7 +857,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testOffsetFetchTopicDescribe() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.position(tp) @@ -871,6 +883,31 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.partitionsFor(topic) } + @Test(expected = classOf[GroupAuthorizationException]) + def testDescribeGroupApiWithNoGroupAcl() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group) + } + + @Test + def testDescribeGroupApiWithGroupDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group) + } + + @Test + def testDescribeGroupCliWithGroupDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + + val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupService = new KafkaConsumerGroupService(opts) + consumerGroupService.describeGroup() + consumerGroupService.close() + } + @Test def testUnauthorizedDeleteWithoutDescribe() { val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)