This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new f0938bf  KAFKA-13143; Remove Metadata handling from ControllerApis 
(#11135)
f0938bf is described below

commit f0938bf82ecbc8e24e80fb0c25fd0b6f62be6bf6
Author: Niket <[email protected]>
AuthorDate: Thu Jul 29 09:23:47 2021 -0700

    KAFKA-13143; Remove Metadata handling from ControllerApis (#11135)
    
    This PR removes the `METADATA` API from the Kraft controller as the 
controller does not yet implement the metadata fetch functionality completely.
    
    Without the change (as per the JIRA 
https://issues.apache.org/jira/browse/KAFKA-13143), the API would return an 
empty list of topics making the caller incorrectly think that there were no 
topics in the cluster which could be confusing. After this change the describe 
and list topic APIs timeout on the controller endpoint when using the 
`kafka-topics` CLI (which is the same behavior as create_topic).
    
    Reviewers: Luke Chen <[email protected]>, José Armando García Sancio 
<[email protected]>, Jason Gustafson <[email protected]>
---
 .../resources/common/message/MetadataRequest.json  |  2 +-
 .../main/scala/kafka/server/ControllerApis.scala   | 38 ----------------------
 2 files changed, 1 insertion(+), 39 deletions(-)

diff --git a/clients/src/main/resources/common/message/MetadataRequest.json 
b/clients/src/main/resources/common/message/MetadataRequest.json
index a1634b1..2d88a0d 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 3,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker", "broker"],
   "name": "MetadataRequest",
   "validVersions": "0-11",
   "flexibleVersions": "9+",
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 35028b4..2f86e1f 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -40,12 +40,10 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, 
DeletableTopicResultCollection}
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
-import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.resource.Resource
 import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
 import org.apache.kafka.common.utils.Time
@@ -86,7 +84,6 @@ class ControllerApis(val requestChannel: RequestChannel,
       request.header.apiKey match {
         case ApiKeys.FETCH => handleFetch(request)
         case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
-        case ApiKeys.METADATA => handleMetadataRequest(request)
         case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
         case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
@@ -151,41 +148,6 @@ class ControllerApis(val requestChannel: RequestChannel,
     handleRaftRequest(request, response => new 
FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
   }
 
-  def handleMetadataRequest(request: RequestChannel.Request): Unit = {
-    val metadataRequest = request.body[MetadataRequest]
-    def createResponseCallback(requestThrottleMs: Int): MetadataResponse = {
-      val metadataResponseData = new MetadataResponseData()
-      metadataResponseData.setThrottleTimeMs(requestThrottleMs)
-      controllerNodes.foreach { node =>
-        metadataResponseData.brokers.add(new MetadataResponseBroker()
-          .setHost(node.host)
-          .setNodeId(node.id)
-          .setPort(node.port)
-          .setRack(node.rack))
-      }
-      metadataResponseData.setClusterId(metaProperties.clusterId)
-      if (controller.isActive) {
-        metadataResponseData.setControllerId(config.nodeId)
-      } else {
-        metadataResponseData.setControllerId(MetadataResponse.NO_CONTROLLER_ID)
-      }
-      val clusterAuthorizedOperations = if 
(metadataRequest.data.includeClusterAuthorizedOperations) {
-        if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, 
CLUSTER_NAME)) {
-          authHelper.authorizedOperations(request, Resource.CLUSTER)
-        } else {
-          0
-        }
-      } else {
-        Int.MinValue
-      }
-      // TODO: fill in information about the metadata topic
-      
metadataResponseData.setClusterAuthorizedOperations(clusterAuthorizedOperations)
-      new MetadataResponse(metadataResponseData, request.header.apiVersion)
-    }
-    requestHelper.sendResponseMaybeThrottle(request,
-      requestThrottleMs => createResponseCallback(requestThrottleMs))
-  }
-
   def handleDeleteTopics(request: RequestChannel.Request): Unit = {
     val deleteTopicsRequest = request.body[DeleteTopicsRequest]
     val future = deleteTopics(deleteTopicsRequest.data,

Reply via email to