This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 59931cd73b30beabb232e43d4b53886b4c133bf2 Author: David Arthur <[email protected]> AuthorDate: Sat Jul 10 13:45:27 2021 -0400 KAFKA-13057; Add KRaft "broker" to several RPC's listeners (#11012) This patch fixes a few request listener specs. We were missing "broker" for many APIs which are now implemented in KRaft and there were a couple cases where we had unnecessarily exposed a controller-only API on the broker. Reviewers: Jason Gustafson <[email protected]> --- .../main/resources/common/message/AddOffsetsToTxnRequest.json | 2 +- .../resources/common/message/AddPartitionsToTxnRequest.json | 2 +- .../resources/common/message/AllocateProducerIdsRequest.json | 2 +- .../main/resources/common/message/AlterConfigsRequest.json | 2 +- .../resources/common/message/CreatePartitionsRequest.json | 2 +- .../main/resources/common/message/ElectLeadersRequest.json | 2 +- clients/src/main/resources/common/message/EndTxnRequest.json | 2 +- .../main/resources/common/message/TxnOffsetCommitRequest.json | 2 +- .../resources/common/message/UnregisterBrokerRequest.json | 2 +- .../main/resources/common/message/UpdateFeaturesRequest.json | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 11 +---------- 11 files changed, 11 insertions(+), 20 deletions(-) diff --git a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json index 7212a02..ade3fc7 100644 --- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json +++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json @@ -16,7 +16,7 @@ { "apiKey": 25, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker"], "name": "AddOffsetsToTxnRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json index 99e72a9..4920da1 100644 --- a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json +++ b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json @@ -16,7 +16,7 @@ { "apiKey": 24, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker"], "name": "AddPartitionsToTxnRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json index 6f37313..7256c6b 100644 --- a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json +++ b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 67, "type": "request", - "listeners": ["zkBroker", "broker", "controller"], + "listeners": ["zkBroker", "controller"], "name": "AllocateProducerIdsRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/AlterConfigsRequest.json b/clients/src/main/resources/common/message/AlterConfigsRequest.json index fa46656..4c28ee1 100644 --- a/clients/src/main/resources/common/message/AlterConfigsRequest.json +++ b/clients/src/main/resources/common/message/AlterConfigsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 33, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker"], "name": "AlterConfigsRequest", // Version 1 is the same as version 0. // Version 2 enables flexible versions. diff --git a/clients/src/main/resources/common/message/CreatePartitionsRequest.json b/clients/src/main/resources/common/message/CreatePartitionsRequest.json index 8053628..6e24949 100644 --- a/clients/src/main/resources/common/message/CreatePartitionsRequest.json +++ b/clients/src/main/resources/common/message/CreatePartitionsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 37, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker", "controller"], "name": "CreatePartitionsRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json index d407f5e..dd9fa21 100644 --- a/clients/src/main/resources/common/message/ElectLeadersRequest.json +++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json @@ -16,7 +16,7 @@ { "apiKey": 43, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker", "controller"], "name": "ElectLeadersRequest", // Version 1 implements multiple leader election types, as described by KIP-460. // diff --git a/clients/src/main/resources/common/message/EndTxnRequest.json b/clients/src/main/resources/common/message/EndTxnRequest.json index 7e7d41d..f16ef76 100644 --- a/clients/src/main/resources/common/message/EndTxnRequest.json +++ b/clients/src/main/resources/common/message/EndTxnRequest.json @@ -16,7 +16,7 @@ { "apiKey": 26, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker"], "name": "EndTxnRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json index 127ff3d..a832ef7 100644 --- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json +++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json @@ -16,7 +16,7 @@ { "apiKey": 28, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker"], "name": "TxnOffsetCommitRequest", // Version 1 is the same as version 0. // diff --git a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json index 05fd315..4fb8d8d 100644 --- a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json +++ b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json @@ -16,7 +16,7 @@ { "apiKey": 64, "type": "request", - "listeners": ["broker", "controller"], + "listeners": ["controller"], "name": "UnregisterBrokerRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json index 41be8cf..2b31813 100644 --- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json +++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json @@ -16,7 +16,7 @@ { "apiKey": 57, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker"], "name": "UpdateFeaturesRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6bf1a0e..a220940 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -223,10 +223,9 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal) case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request) case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request) - case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) - case ApiKeys.ALLOCATE_PRODUCER_IDS => maybeForwardToController(request, handleAllocateProducerIdsRequest) + case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } @@ -3337,14 +3336,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleUnregisterBrokerRequest(request: RequestChannel.Request): Unit = { - // This function will not be called when in KRaft mode, since the - // UNREGISTER_BROKER API is marked as forwardable and we will always have a forwarding - // manager. - throw new UnsupportedVersionException("The broker unregistration API is not available when using " + - "Apache ZooKeeper mode.") - } - def handleDescribeTransactionsRequest(request: RequestChannel.Request): Unit = { val describeTransactionsRequest = request.body[DescribeTransactionsRequest] val response = new DescribeTransactionsResponseData()
