Repository: kafka
Updated Branches:
  refs/heads/trunk 4065ffb3e -> 3410f02fe


KAFKA-4585: Lower the Minimum Required ACL Permission of OffsetFetch (KIP-163)

Details can be found in the 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-163%3A+Lower+the+Minimum+Required+ACL+Permission+of+OffsetFetch).

Author: Vahid Hashemian <vahidhashem...@us.ibm.com>

Reviewers: Ewen Cheslack-Postava <e...@confluent.io>

Closes #3661 from vahidhashemian/KAFKA-4585


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3410f02f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3410f02f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3410f02f

Branch: refs/heads/trunk
Commit: 3410f02fec6bc8633ebb58e3c1a8f9c2e49a1aa4
Parents: 4065ffb
Author: Vahid Hashemian <vahidhashem...@us.ibm.com>
Authored: Thu Sep 7 11:40:54 2017 -0700
Committer: Ewen Cheslack-Postava <m...@ewencp.org>
Committed: Thu Sep 7 11:40:54 2017 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala |  2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   | 47 +++++++++++++++++---
 2 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3410f02f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 17c3f2d..4fd906f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -989,7 +989,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     def createResponse(requestThrottleMs: Int): AbstractResponse = {
       val offsetFetchResponse =
         // reject the request if not authorized to the group
-        if (!authorize(request.session, Read, new Resource(Group, 
offsetFetchRequest.groupId)))
+        if (!authorize(request.session, Describe, new Resource(Group, 
offsetFetchRequest.groupId)))
           offsetFetchRequest.getErrorResponse(requestThrottleMs, 
Errors.GROUP_AUTHORIZATION_FAILED)
         else {
           if (header.apiVersion == 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3410f02f/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ccb2719..dcd2038 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -18,13 +18,17 @@ import java.util.concurrent.ExecutionException
 import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 
+import kafka.admin.AdminClient
+import kafka.admin.AdminUtils
+import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
+import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
 import kafka.common.TopicAndPartition
+import kafka.log.LogConfig
+import kafka.network.SocketServer
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
-import kafka.admin.AdminUtils
-import kafka.log.LogConfig
-import kafka.network.SocketServer
+
 import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
@@ -116,6 +120,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.UPDATE_METADATA_KEY -> classOf[requests.UpdateMetadataResponse],
       ApiKeys.JOIN_GROUP -> classOf[JoinGroupResponse],
       ApiKeys.SYNC_GROUP -> classOf[SyncGroupResponse],
+      ApiKeys.DESCRIBE_GROUPS -> classOf[DescribeGroupsResponse],
       ApiKeys.HEARTBEAT -> classOf[HeartbeatResponse],
       ApiKeys.LEAVE_GROUP -> classOf[LeaveGroupResponse],
       ApiKeys.LEADER_AND_ISR -> classOf[requests.LeaderAndIsrResponse],
@@ -150,6 +155,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => 
resp.error),
     ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
     ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
+    ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => 
resp.groups.get(group).error),
     ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
     ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
     ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => 
resp.responses.asScala.find(_._1 == tp).get._2),
@@ -182,11 +188,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.FETCH -> topicReadAcl,
     ApiKeys.LIST_OFFSETS -> topicDescribeAcl,
     ApiKeys.OFFSET_COMMIT -> (topicReadAcl ++ groupReadAcl),
-    ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupReadAcl),
+    ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupDescribeAcl),
     ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupDescribeAcl ++ 
transactionalIdDescribeAcl),
     ApiKeys.UPDATE_METADATA_KEY -> clusterAcl,
     ApiKeys.JOIN_GROUP -> groupReadAcl,
     ApiKeys.SYNC_GROUP -> groupReadAcl,
+    ApiKeys.DESCRIBE_GROUPS -> groupDescribeAcl,
     ApiKeys.HEARTBEAT -> groupReadAcl,
     ApiKeys.LEAVE_GROUP -> groupReadAcl,
     ApiKeys.LEADER_AND_ISR -> clusterAcl,
@@ -300,6 +307,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     new SyncGroupRequest.Builder(group, 1, "", Map[String, 
ByteBuffer]().asJava).build()
   }
 
+  private def createDescribeGroupsRequest = {
+    new DescribeGroupsRequest.Builder(List(group).asJava).build()
+  }
+
   private def createOffsetCommitRequest = {
     new requests.OffsetCommitRequest.Builder(
       group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, 
"metadata")).asJava).
@@ -365,6 +376,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest,
       ApiKeys.JOIN_GROUP -> createJoinGroupRequest,
       ApiKeys.SYNC_GROUP -> createSyncGroupRequest,
+      ApiKeys.DESCRIBE_GROUPS -> createDescribeGroupsRequest,
       ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
       ApiKeys.HEARTBEAT -> heartbeatRequest,
       ApiKeys.LEAVE_GROUP -> leaveGroupRequest,
@@ -845,7 +857,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testOffsetFetchTopicDescribe() {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), groupResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.position(tp)
@@ -871,6 +883,31 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     this.consumers.head.partitionsFor(topic)
   }
 
+  @Test(expected = classOf[GroupAuthorizationException])
+  def testDescribeGroupApiWithNoGroupAcl() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
+  }
+
+  @Test
+  def testDescribeGroupApiWithGroupDescribe() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
+  }
+
+  @Test
+  def testDescribeGroupCliWithGroupDescribe() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupService = new KafkaConsumerGroupService(opts)
+    consumerGroupService.describeGroup()
+    consumerGroupService.close()
+  }
+
   @Test
   def testUnauthorizedDeleteWithoutDescribe() {
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)

Reply via email to