This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7a1d1d9 KAFKA-12212; Bump Metadata API version to remove
`ClusterAuthorizedOperations` fields (KIP-700) (#9945)
7a1d1d9 is described below
commit 7a1d1d9a69a241efd68e572badee999229b3942f
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 22 09:06:37 2021 +0100
KAFKA-12212; Bump Metadata API version to remove
`ClusterAuthorizedOperations` fields (KIP-700) (#9945)
This patch bumps the version of the Metadata API and removes the
`IncludeClusterAuthorizedOperations` and the
`IncludeClusterAuthorizedOperations` fields from version 11 and onward.
Reviewers: Chia-Ping Tsai <[email protected]>, Rajini Sivaram
<[email protected]>
---
.../resources/common/message/MetadataRequest.json | 10 ++-
.../resources/common/message/MetadataResponse.json | 10 ++-
core/src/main/scala/kafka/server/KafkaApis.scala | 18 ++---
.../kafka/api/AuthorizerIntegrationTest.scala | 77 ++++++++++++++++++----
4 files changed, 89 insertions(+), 26 deletions(-)
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json
b/clients/src/main/resources/common/message/MetadataRequest.json
index 30316e6..02af116 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -17,7 +17,7 @@
"apiKey": 3,
"type": "request",
"name": "MetadataRequest",
- "validVersions": "0-10",
+ "validVersions": "0-11",
"flexibleVersions": "9+",
"fields": [
// In version 0, an empty array indicates "request metadata for all
topics." In version 1 and
@@ -31,7 +31,11 @@
// Starting in version 8, authorized operations can be requested for
cluster and topic resource.
//
// Version 9 is the first flexible version.
- // Version 10 add topicId
+ //
+ // Version 10 adds topicId.
+ //
+ // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is
now exposed
+ // by the DescribeCluster API (KIP-700).
{ "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+",
"nullableVersions": "1+",
"about": "The topics to fetch metadata for.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable":
true, "about": "The topic id." },
@@ -40,7 +44,7 @@
]},
{ "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+",
"default": "true", "ignorable": false,
"about": "If this is true, the broker may auto-create topics that we
requested which do not already exist, if it is configured to do so." },
- { "name": "IncludeClusterAuthorizedOperations", "type": "bool",
"versions": "8+",
+ { "name": "IncludeClusterAuthorizedOperations", "type": "bool",
"versions": "8-10",
"about": "Whether to include cluster authorized operations." },
{ "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions":
"8+",
"about": "Whether to include topic authorized operations." }
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json
b/clients/src/main/resources/common/message/MetadataResponse.json
index b54b830..70638d2 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -36,8 +36,12 @@
// Starting in version 8, brokers can send authorized operations for topic
and cluster.
//
// Version 9 is the first flexible version.
- // Version 10 add topicId
- "validVersions": "0-10",
+ //
+ // Version 10 adds topicId.
+ //
+ // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
+ // by the DescribeCluster API (KIP-700).
+ "validVersions": "0-11",
"flexibleVersions": "9+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+",
"ignorable": true,
@@ -86,7 +90,7 @@
{ "name": "TopicAuthorizedOperations", "type": "int32", "versions":
"8+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this
topic." }
]},
- { "name": "ClusterAuthorizedOperations", "type": "int32", "versions":
"8+", "default": "-2147483648",
+ { "name": "ClusterAuthorizedOperations", "type": "int32", "versions":
"8-10", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this
cluster." }
]
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9004cef..f2d7a8c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1245,14 +1245,16 @@ class KafkaApis(val requestChannel: RequestChannel,
)
}
- var clusterAuthorizedOperations = Int.MinValue
- if (request.header.apiVersion >= 8) {
+ var clusterAuthorizedOperations = Int.MinValue // Default value in the
schema
+ if (requestVersion >= 8) {
// get cluster authorized operations
- if (metadataRequest.data.includeClusterAuthorizedOperations) {
- if (authHelper.authorize(request.context, DESCRIBE, CLUSTER,
CLUSTER_NAME))
- clusterAuthorizedOperations =
authHelper.authorizedOperations(request, Resource.CLUSTER)
- else
- clusterAuthorizedOperations = 0
+ if (requestVersion <= 10) {
+ if (metadataRequest.data.includeClusterAuthorizedOperations) {
+ if (authHelper.authorize(request.context, DESCRIBE, CLUSTER,
CLUSTER_NAME))
+ clusterAuthorizedOperations =
authHelper.authorizedOperations(request, Resource.CLUSTER)
+ else
+ clusterAuthorizedOperations = 0
+ }
}
// get topic authorized operations
@@ -3211,7 +3213,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDescribeCluster(request: RequestChannel.Request): Unit = {
val describeClusterRequest = request.body[DescribeClusterRequest]
- var clusterAuthorizedOperations = Int.MinValue
+ var clusterAuthorizedOperations = Int.MinValue // Default value in the
schema
// get cluster authorized operations
if (describeClusterRequest.data.includeClusterAuthorizedOperations) {
if (authHelper.authorize(request.context, DESCRIBE, CLUSTER,
CLUSTER_NAME))
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index edda7a9..a2cc596 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -44,6 +44,7 @@ import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition,
ListOffsetsTopic}
+import org.apache.kafka.common.message.MetadataRequestData
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection
@@ -1854,16 +1855,61 @@ class AuthorizerIntegrationTest extends BaseRequestTest
{
}
@Test
+ def testMetadataClusterAuthorizedOperationsWithoutDescribeCluster(): Unit = {
+ removeAllClientAcls()
+
+ // MetadataRequest versions older than 1 are not supported.
+ for (version <- 1 to ApiKeys.METADATA.latestVersion) {
+ testMetadataClusterClusterAuthorizedOperations(version.toShort, 0)
+ }
+ }
+
+ @Test
+ def testMetadataClusterAuthorizedOperationsWithDescribeAndAlterCluster():
Unit = {
+ removeAllClientAcls()
+
+ val clusterResource = new ResourcePattern(ResourceType.CLUSTER,
Resource.CLUSTER_NAME, PatternType.LITERAL)
+ val acls = Set(
+ new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE,
ALLOW),
+ new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW)
+ )
+ addAndVerifyAcls(acls, clusterResource)
+
+ val expectedClusterAuthorizedOperations = Utils.to32BitField(
+ acls.map(_.operation.code.asInstanceOf[JByte]).asJava)
+
+ // MetadataRequest versions older than 1 are not supported.
+ for (version <- 1 to ApiKeys.METADATA.latestVersion) {
+ testMetadataClusterClusterAuthorizedOperations(version.toShort,
expectedClusterAuthorizedOperations)
+ }
+ }
+
+ private def testMetadataClusterClusterAuthorizedOperations(
+ version: Short,
+ expectedClusterAuthorizedOperations: Int
+ ): Unit = {
+ val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData()
+ .setTopics(Collections.emptyList())
+ .setAllowAutoTopicCreation(true)
+ .setIncludeClusterAuthorizedOperations(true))
+ .build(version)
+
+ // The expected value is only verified if the request supports it.
+ if (version >= 8 && version <= 10) {
+ val metadataResponse =
connectAndReceive[MetadataResponse](metadataRequest)
+ assertEquals(expectedClusterAuthorizedOperations,
metadataResponse.data.clusterAuthorizedOperations)
+ } else {
+ assertThrows(classOf[UnsupportedVersionException],
+ () => connectAndReceive[MetadataResponse](metadataRequest))
+ }
+ }
+
+ @Test
def testDescribeClusterClusterAuthorizedOperationsWithoutDescribeCluster():
Unit = {
removeAllClientAcls()
for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to
ApiKeys.DESCRIBE_CLUSTER.latestVersion) {
- val describeClusterRequest = new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()
- .setIncludeClusterAuthorizedOperations(true))
- .build(version.toShort)
- val describeClusterResponse =
connectAndReceive[DescribeClusterResponse](describeClusterRequest)
-
- assertEquals(0, describeClusterResponse.data.clusterAuthorizedOperations)
+ testDescribeClusterClusterAuthorizedOperations(version.toShort, 0)
}
}
@@ -1882,15 +1928,22 @@ class AuthorizerIntegrationTest extends BaseRequestTest
{
acls.map(_.operation.code.asInstanceOf[JByte]).asJava)
for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to
ApiKeys.DESCRIBE_CLUSTER.latestVersion) {
- val describeClusterRequest = new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()
- .setIncludeClusterAuthorizedOperations(true))
- .build(version.toShort)
- val describeClusterResponse =
connectAndReceive[DescribeClusterResponse](describeClusterRequest)
-
- assertEquals(expectedClusterAuthorizedOperations,
describeClusterResponse.data.clusterAuthorizedOperations)
+ testDescribeClusterClusterAuthorizedOperations(version.toShort,
expectedClusterAuthorizedOperations)
}
}
+ private def testDescribeClusterClusterAuthorizedOperations(
+ version: Short,
+ expectedClusterAuthorizedOperations: Int
+ ): Unit = {
+ val describeClusterRequest = new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()
+ .setIncludeClusterAuthorizedOperations(true))
+ .build(version)
+
+ val describeClusterResponse =
connectAndReceive[DescribeClusterResponse](describeClusterRequest)
+ assertEquals(expectedClusterAuthorizedOperations,
describeClusterResponse.data.clusterAuthorizedOperations)
+ }
+
def removeAllClientAcls(): Unit = {
val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString,
null, AclOperation.ANY, AclPermissionType.ANY)