This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new f0938bf KAFKA-13143; Remove Metadata handling from ControllerApis
(#11135)
f0938bf is described below
commit f0938bf82ecbc8e24e80fb0c25fd0b6f62be6bf6
Author: Niket <[email protected]>
AuthorDate: Thu Jul 29 09:23:47 2021 -0700
KAFKA-13143; Remove Metadata handling from ControllerApis (#11135)
This PR removes the `METADATA` API from the Kraft controller as the
controller does not yet implement the metadata fetch functionality completely.
Without the change (as per the JIRA
https://issues.apache.org/jira/browse/KAFKA-13143), the API would return an
empty list of topics making the caller incorrectly think that there were no
topics in the cluster which could be confusing. After this change the describe
and list topic APIs timeout on the controller endpoint when using the
`kafka-topics` CLI (which is the same behavior as create_topic).
Reviewers: Luke Chen <[email protected]>, José Armando García Sancio
<[email protected]>, Jason Gustafson <[email protected]>
---
.../resources/common/message/MetadataRequest.json | 2 +-
.../main/scala/kafka/server/ControllerApis.scala | 38 ----------------------
2 files changed, 1 insertion(+), 39 deletions(-)
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json
b/clients/src/main/resources/common/message/MetadataRequest.json
index a1634b1..2d88a0d 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 3,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker", "broker"],
"name": "MetadataRequest",
"validVersions": "0-11",
"flexibleVersions": "9+",
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 35028b4..2f86e1f 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -40,12 +40,10 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData
import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import
org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult,
DeletableTopicResultCollection}
import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
-import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
import org.apache.kafka.common.message._
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
-import org.apache.kafka.common.resource.Resource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.utils.Time
@@ -86,7 +84,6 @@ class ControllerApis(val requestChannel: RequestChannel,
request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
- case ApiKeys.METADATA => handleMetadataRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
@@ -151,41 +148,6 @@ class ControllerApis(val requestChannel: RequestChannel,
handleRaftRequest(request, response => new
FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
}
- def handleMetadataRequest(request: RequestChannel.Request): Unit = {
- val metadataRequest = request.body[MetadataRequest]
- def createResponseCallback(requestThrottleMs: Int): MetadataResponse = {
- val metadataResponseData = new MetadataResponseData()
- metadataResponseData.setThrottleTimeMs(requestThrottleMs)
- controllerNodes.foreach { node =>
- metadataResponseData.brokers.add(new MetadataResponseBroker()
- .setHost(node.host)
- .setNodeId(node.id)
- .setPort(node.port)
- .setRack(node.rack))
- }
- metadataResponseData.setClusterId(metaProperties.clusterId)
- if (controller.isActive) {
- metadataResponseData.setControllerId(config.nodeId)
- } else {
- metadataResponseData.setControllerId(MetadataResponse.NO_CONTROLLER_ID)
- }
- val clusterAuthorizedOperations = if
(metadataRequest.data.includeClusterAuthorizedOperations) {
- if (authHelper.authorize(request.context, DESCRIBE, CLUSTER,
CLUSTER_NAME)) {
- authHelper.authorizedOperations(request, Resource.CLUSTER)
- } else {
- 0
- }
- } else {
- Int.MinValue
- }
- // TODO: fill in information about the metadata topic
-
metadataResponseData.setClusterAuthorizedOperations(clusterAuthorizedOperations)
- new MetadataResponse(metadataResponseData, request.header.apiVersion)
- }
- requestHelper.sendResponseMaybeThrottle(request,
- requestThrottleMs => createResponseCallback(requestThrottleMs))
- }
-
def handleDeleteTopics(request: RequestChannel.Request): Unit = {
val deleteTopicsRequest = request.body[DeleteTopicsRequest]
val future = deleteTopics(deleteTopicsRequest.data,