This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 59931cd73b30beabb232e43d4b53886b4c133bf2
Author: David Arthur <[email protected]>
AuthorDate: Sat Jul 10 13:45:27 2021 -0400

    KAFKA-13057; Add KRaft "broker" to several RPC's listeners (#11012)
    
    This patch fixes a few request listener specs. We were missing "broker" for 
many APIs which are now implemented in KRaft and there were a couple cases 
where we had unnecessarily exposed a controller-only API on the broker.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../main/resources/common/message/AddOffsetsToTxnRequest.json |  2 +-
 .../resources/common/message/AddPartitionsToTxnRequest.json   |  2 +-
 .../resources/common/message/AllocateProducerIdsRequest.json  |  2 +-
 .../main/resources/common/message/AlterConfigsRequest.json    |  2 +-
 .../resources/common/message/CreatePartitionsRequest.json     |  2 +-
 .../main/resources/common/message/ElectLeadersRequest.json    |  2 +-
 clients/src/main/resources/common/message/EndTxnRequest.json  |  2 +-
 .../main/resources/common/message/TxnOffsetCommitRequest.json |  2 +-
 .../resources/common/message/UnregisterBrokerRequest.json     |  2 +-
 .../main/resources/common/message/UpdateFeaturesRequest.json  |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala              | 11 +----------
 11 files changed, 11 insertions(+), 20 deletions(-)

diff --git 
a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json 
b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
index 7212a02..ade3fc7 100644
--- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 25,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker"],
   "name": "AddOffsetsToTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json 
b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
index 99e72a9..4920da1 100644
--- a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 24,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker"],
   "name": "AddPartitionsToTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json 
b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
index 6f37313..7256c6b 100644
--- a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
+++ b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 67,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker", "controller"],
   "name": "AllocateProducerIdsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/AlterConfigsRequest.json 
b/clients/src/main/resources/common/message/AlterConfigsRequest.json
index fa46656..4c28ee1 100644
--- a/clients/src/main/resources/common/message/AlterConfigsRequest.json
+++ b/clients/src/main/resources/common/message/AlterConfigsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 33,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker"],
   "name": "AlterConfigsRequest",
   // Version 1 is the same as version 0.
   // Version 2 enables flexible versions.
diff --git 
a/clients/src/main/resources/common/message/CreatePartitionsRequest.json 
b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
index 8053628..6e24949 100644
--- a/clients/src/main/resources/common/message/CreatePartitionsRequest.json
+++ b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 37,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "CreatePartitionsRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json 
b/clients/src/main/resources/common/message/ElectLeadersRequest.json
index d407f5e..dd9fa21 100644
--- a/clients/src/main/resources/common/message/ElectLeadersRequest.json
+++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 43,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "ElectLeadersRequest",
   // Version 1 implements multiple leader election types, as described by 
KIP-460.
   //
diff --git a/clients/src/main/resources/common/message/EndTxnRequest.json 
b/clients/src/main/resources/common/message/EndTxnRequest.json
index 7e7d41d..f16ef76 100644
--- a/clients/src/main/resources/common/message/EndTxnRequest.json
+++ b/clients/src/main/resources/common/message/EndTxnRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 26,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker"],
   "name": "EndTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json 
b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
index 127ff3d..a832ef7 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 28,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker"],
   "name": "TxnOffsetCommitRequest",
   // Version 1 is the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json 
b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
index 05fd315..4fb8d8d 100644
--- a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
+++ b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 64,
   "type": "request",
-  "listeners": ["broker", "controller"],
+  "listeners": ["controller"],
   "name": "UnregisterBrokerRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git 
a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json 
b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
index 41be8cf..2b31813 100644
--- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
+++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 57,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker"],
   "name": "UpdateFeaturesRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6bf1a0e..a220940 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -223,10 +223,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
         case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
         case ApiKeys.DESCRIBE_PRODUCERS => 
handleDescribeProducersRequest(request)
-        case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, 
handleUnregisterBrokerRequest)
         case ApiKeys.DESCRIBE_TRANSACTIONS => 
handleDescribeTransactionsRequest(request)
         case ApiKeys.LIST_TRANSACTIONS => 
handleListTransactionsRequest(request)
-        case ApiKeys.ALLOCATE_PRODUCER_IDS => 
maybeForwardToController(request, handleAllocateProducerIdsRequest)
+        case ApiKeys.ALLOCATE_PRODUCER_IDS => 
handleAllocateProducerIdsRequest(request)
         case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
         case _ => throw new IllegalStateException(s"No handler for request api 
key ${request.header.apiKey}")
       }
@@ -3337,14 +3336,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleUnregisterBrokerRequest(request: RequestChannel.Request): Unit = {
-    // This function will not be called when in KRaft mode, since the
-    // UNREGISTER_BROKER API is marked as forwardable and we will always have 
a forwarding
-    // manager.
-    throw new UnsupportedVersionException("The broker unregistration API is 
not available when using " +
-      "Apache ZooKeeper mode.")
-  }
-
   def handleDescribeTransactionsRequest(request: RequestChannel.Request): Unit 
= {
     val describeTransactionsRequest = request.body[DescribeTransactionsRequest]
     val response = new DescribeTransactionsResponseData()

Reply via email to