This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a1d1d9  KAFKA-12212; Bump Metadata API version to remove 
`ClusterAuthorizedOperations` fields (KIP-700) (#9945)
7a1d1d9 is described below

commit 7a1d1d9a69a241efd68e572badee999229b3942f
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 22 09:06:37 2021 +0100

    KAFKA-12212; Bump Metadata API version to remove 
`ClusterAuthorizedOperations` fields (KIP-700) (#9945)
    
    This patch bumps the version of the Metadata API and removes the 
`IncludeClusterAuthorizedOperations` and the 
`IncludeClusterAuthorizedOperations` fields from version 11 and onward.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Rajini Sivaram 
<[email protected]>
---
 .../resources/common/message/MetadataRequest.json  | 10 ++-
 .../resources/common/message/MetadataResponse.json | 10 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 18 ++---
 .../kafka/api/AuthorizerIntegrationTest.scala      | 77 ++++++++++++++++++----
 4 files changed, 89 insertions(+), 26 deletions(-)

diff --git a/clients/src/main/resources/common/message/MetadataRequest.json 
b/clients/src/main/resources/common/message/MetadataRequest.json
index 30316e6..02af116 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -17,7 +17,7 @@
   "apiKey": 3,
   "type": "request",
   "name": "MetadataRequest",
-  "validVersions": "0-10",
+  "validVersions": "0-11",
   "flexibleVersions": "9+",
   "fields": [
     // In version 0, an empty array indicates "request metadata for all 
topics."  In version 1 and
@@ -31,7 +31,11 @@
     // Starting in version 8, authorized operations can be requested for 
cluster and topic resource.
     //
     // Version 9 is the first flexible version.
-    // Version 10 add topicId
+    //
+    // Version 10 adds topicId.
+    //
+    // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is 
now exposed
+    // by the DescribeCluster API (KIP-700).
     { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", 
"nullableVersions": "1+",
       "about": "The topics to fetch metadata for.", "fields": [
       { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": 
true, "about": "The topic id." },
@@ -40,7 +44,7 @@
     ]},
     { "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", 
"default": "true", "ignorable": false,
       "about": "If this is true, the broker may auto-create topics that we 
requested which do not already exist, if it is configured to do so." },
-    { "name": "IncludeClusterAuthorizedOperations", "type": "bool", 
"versions": "8+",
+    { "name": "IncludeClusterAuthorizedOperations", "type": "bool", 
"versions": "8-10",
       "about": "Whether to include cluster authorized operations." },
     { "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": 
"8+",
       "about": "Whether to include topic authorized operations." }
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json 
b/clients/src/main/resources/common/message/MetadataResponse.json
index b54b830..70638d2 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -36,8 +36,12 @@
   // Starting in version 8, brokers can send authorized operations for topic 
and cluster.
   //
   // Version 9 is the first flexible version.
-  // Version 10 add topicId
-  "validVersions": "0-10",
+  //
+  // Version 10 adds topicId.
+  //
+  // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
+  // by the DescribeCluster API (KIP-700).
+  "validVersions": "0-11",
   "flexibleVersions": "9+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
@@ -86,7 +90,7 @@
       { "name": "TopicAuthorizedOperations", "type": "int32", "versions": 
"8+", "default": "-2147483648",
         "about": "32-bit bitfield to represent authorized operations for this 
topic." }
     ]},
-    { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": 
"8+", "default": "-2147483648",
+    { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": 
"8-10", "default": "-2147483648",
       "about": "32-bit bitfield to represent authorized operations for this 
cluster." }
   ]
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9004cef..f2d7a8c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1245,14 +1245,16 @@ class KafkaApis(val requestChannel: RequestChannel,
         )
       }
 
-    var clusterAuthorizedOperations = Int.MinValue
-    if (request.header.apiVersion >= 8) {
+    var clusterAuthorizedOperations = Int.MinValue // Default value in the 
schema
+    if (requestVersion >= 8) {
       // get cluster authorized operations
-      if (metadataRequest.data.includeClusterAuthorizedOperations) {
-        if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, 
CLUSTER_NAME))
-          clusterAuthorizedOperations = 
authHelper.authorizedOperations(request, Resource.CLUSTER)
-        else
-          clusterAuthorizedOperations = 0
+      if (requestVersion <= 10) {
+        if (metadataRequest.data.includeClusterAuthorizedOperations) {
+          if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, 
CLUSTER_NAME))
+            clusterAuthorizedOperations = 
authHelper.authorizedOperations(request, Resource.CLUSTER)
+          else
+            clusterAuthorizedOperations = 0
+        }
       }
 
       // get topic authorized operations
@@ -3211,7 +3213,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleDescribeCluster(request: RequestChannel.Request): Unit = {
     val describeClusterRequest = request.body[DescribeClusterRequest]
 
-    var clusterAuthorizedOperations = Int.MinValue
+    var clusterAuthorizedOperations = Int.MinValue // Default value in the 
schema
     // get cluster authorized operations
     if (describeClusterRequest.data.includeClusterAuthorizedOperations) {
       if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, 
CLUSTER_NAME))
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index edda7a9..a2cc596 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -44,6 +44,7 @@ import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
+import org.apache.kafka.common.message.MetadataRequestData
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection
@@ -1854,16 +1855,61 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
   }
 
   @Test
+  def testMetadataClusterAuthorizedOperationsWithoutDescribeCluster(): Unit = {
+    removeAllClientAcls()
+
+    // MetadataRequest versions older than 1 are not supported.
+    for (version <- 1 to ApiKeys.METADATA.latestVersion) {
+      testMetadataClusterClusterAuthorizedOperations(version.toShort, 0)
+    }
+  }
+
+  @Test
+  def testMetadataClusterAuthorizedOperationsWithDescribeAndAlterCluster(): 
Unit = {
+    removeAllClientAcls()
+
+    val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+    val acls = Set(
+      new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, 
ALLOW),
+      new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW)
+    )
+    addAndVerifyAcls(acls, clusterResource)
+
+    val expectedClusterAuthorizedOperations = Utils.to32BitField(
+      acls.map(_.operation.code.asInstanceOf[JByte]).asJava)
+
+    // MetadataRequest versions older than 1 are not supported.
+    for (version <- 1 to ApiKeys.METADATA.latestVersion) {
+      testMetadataClusterClusterAuthorizedOperations(version.toShort, 
expectedClusterAuthorizedOperations)
+    }
+  }
+
+  private def testMetadataClusterClusterAuthorizedOperations(
+    version: Short,
+    expectedClusterAuthorizedOperations: Int
+  ): Unit = {
+    val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData()
+      .setTopics(Collections.emptyList())
+      .setAllowAutoTopicCreation(true)
+      .setIncludeClusterAuthorizedOperations(true))
+      .build(version)
+
+    // The expected value is only verified if the request supports it.
+    if (version >= 8 && version <= 10) {
+      val metadataResponse = 
connectAndReceive[MetadataResponse](metadataRequest)
+      assertEquals(expectedClusterAuthorizedOperations, 
metadataResponse.data.clusterAuthorizedOperations)
+    } else {
+      assertThrows(classOf[UnsupportedVersionException],
+        () => connectAndReceive[MetadataResponse](metadataRequest))
+    }
+  }
+
+  @Test
   def testDescribeClusterClusterAuthorizedOperationsWithoutDescribeCluster(): 
Unit = {
     removeAllClientAcls()
 
     for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to 
ApiKeys.DESCRIBE_CLUSTER.latestVersion) {
-      val describeClusterRequest = new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()
-        .setIncludeClusterAuthorizedOperations(true))
-        .build(version.toShort)
-      val describeClusterResponse = 
connectAndReceive[DescribeClusterResponse](describeClusterRequest)
-
-      assertEquals(0, describeClusterResponse.data.clusterAuthorizedOperations)
+      testDescribeClusterClusterAuthorizedOperations(version.toShort, 0)
     }
   }
 
@@ -1882,15 +1928,22 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
       acls.map(_.operation.code.asInstanceOf[JByte]).asJava)
 
     for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to 
ApiKeys.DESCRIBE_CLUSTER.latestVersion) {
-      val describeClusterRequest = new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()
-        .setIncludeClusterAuthorizedOperations(true))
-        .build(version.toShort)
-      val describeClusterResponse = 
connectAndReceive[DescribeClusterResponse](describeClusterRequest)
-
-      assertEquals(expectedClusterAuthorizedOperations, 
describeClusterResponse.data.clusterAuthorizedOperations)
+      testDescribeClusterClusterAuthorizedOperations(version.toShort, 
expectedClusterAuthorizedOperations)
     }
   }
 
+  private def testDescribeClusterClusterAuthorizedOperations(
+    version: Short,
+    expectedClusterAuthorizedOperations: Int
+  ): Unit = {
+    val describeClusterRequest = new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()
+      .setIncludeClusterAuthorizedOperations(true))
+      .build(version)
+
+    val describeClusterResponse = 
connectAndReceive[DescribeClusterResponse](describeClusterRequest)
+    assertEquals(expectedClusterAuthorizedOperations, 
describeClusterResponse.data.clusterAuthorizedOperations)
+  }
+
   def removeAllClientAcls(): Unit = {
     val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
     val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, 
null, AclOperation.ANY, AclPermissionType.ANY)

Reply via email to