This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 9c996f7598840105defb1f740ebbd95bf710cd91
Author: Ken Huang <[email protected]>
AuthorDate: Fri Jan 24 21:53:32 2025 +0800

    KAFKA-18474: Remove zkBroker listener (#18477)
    
    Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai 
<[email protected]>, PoAn Yang <[email protected]>
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  8 ++---
 .../common/message/AddOffsetsToTxnRequest.json     |  2 +-
 .../common/message/AddPartitionsToTxnRequest.json  |  2 +-
 .../common/message/AllocateProducerIdsRequest.json |  2 +-
 .../common/message/AlterClientQuotasRequest.json   |  2 +-
 .../common/message/AlterConfigsRequest.json        |  2 +-
 .../AlterPartitionReassignmentsRequest.json        |  2 +-
 .../common/message/AlterPartitionRequest.json      |  2 +-
 .../common/message/AlterReplicaLogDirsRequest.json |  2 +-
 .../message/AlterUserScramCredentialsRequest.json  |  2 +-
 .../common/message/ApiVersionsRequest.json         |  2 +-
 .../common/message/CreateAclsRequest.json          |  2 +-
 .../message/CreateDelegationTokenRequest.json      |  2 +-
 .../common/message/CreatePartitionsRequest.json    |  2 +-
 .../common/message/CreateTopicsRequest.json        |  2 +-
 .../common/message/DeleteAclsRequest.json          |  2 +-
 .../common/message/DeleteGroupsRequest.json        |  2 +-
 .../common/message/DeleteRecordsRequest.json       |  2 +-
 .../common/message/DeleteTopicsRequest.json        |  2 +-
 .../common/message/DescribeAclsRequest.json        |  2 +-
 .../message/DescribeClientQuotasRequest.json       |  2 +-
 .../common/message/DescribeClusterRequest.json     |  2 +-
 .../common/message/DescribeConfigsRequest.json     |  2 +-
 .../message/DescribeDelegationTokenRequest.json    |  2 +-
 .../common/message/DescribeGroupsRequest.json      |  2 +-
 .../common/message/DescribeLogDirsRequest.json     |  2 +-
 .../common/message/DescribeProducersRequest.json   |  2 +-
 .../message/DescribeTransactionsRequest.json       |  2 +-
 .../DescribeUserScramCredentialsRequest.json       |  2 +-
 .../common/message/ElectLeadersRequest.json        |  2 +-
 .../resources/common/message/EndTxnRequest.json    |  2 +-
 .../resources/common/message/EnvelopeRequest.json  |  2 +-
 .../message/ExpireDelegationTokenRequest.json      |  2 +-
 .../resources/common/message/FetchRequest.json     |  2 +-
 .../common/message/FindCoordinatorRequest.json     |  2 +-
 .../resources/common/message/HeartbeatRequest.json |  2 +-
 .../message/IncrementalAlterConfigsRequest.json    |  2 +-
 .../common/message/InitProducerIdRequest.json      |  2 +-
 .../resources/common/message/JoinGroupRequest.json |  2 +-
 .../common/message/LeaveGroupRequest.json          |  2 +-
 .../common/message/ListGroupsRequest.json          |  2 +-
 .../common/message/ListOffsetsRequest.json         |  2 +-
 .../message/ListPartitionReassignmentsRequest.json |  2 +-
 .../common/message/ListTransactionsRequest.json    |  2 +-
 .../resources/common/message/MetadataRequest.json  |  2 +-
 .../common/message/OffsetCommitRequest.json        |  2 +-
 .../common/message/OffsetDeleteRequest.json        |  2 +-
 .../common/message/OffsetFetchRequest.json         |  2 +-
 .../message/OffsetForLeaderEpochRequest.json       |  2 +-
 .../resources/common/message/ProduceRequest.json   |  2 +-
 .../message/RenewDelegationTokenRequest.json       |  2 +-
 .../common/message/SaslAuthenticateRequest.json    |  2 +-
 .../common/message/SaslHandshakeRequest.json       |  2 +-
 .../resources/common/message/SyncGroupRequest.json |  2 +-
 .../common/message/TxnOffsetCommitRequest.json     |  2 +-
 .../common/message/UpdateFeaturesRequest.json      |  2 +-
 .../common/message/WriteTxnMarkersRequest.json     |  2 +-
 .../apache/kafka/clients/NetworkClientTest.java    |  4 +--
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  2 +-
 .../internals/FetchRequestManagerTest.java         |  2 +-
 .../clients/consumer/internals/FetcherTest.java    |  2 +-
 .../clients/producer/internals/SenderTest.java     |  2 +-
 .../apache/kafka/common/network/NioEchoServer.java |  2 +-
 .../common/network/SaslChannelBuilderTest.java     |  2 +-
 .../common/network/SslTransportLayerTest.java      |  2 +-
 .../common/requests/ApiVersionsResponseTest.java   | 42 +++++++++++-----------
 .../kafka/common/requests/RequestResponseTest.java |  4 +--
 .../authenticator/SaslAuthenticatorTest.java       |  6 ++--
 .../authenticator/SaslServerAuthenticatorTest.java |  2 +-
 .../kafka/server/GssapiAuthenticationTest.scala    |  2 +-
 .../unit/kafka/server/ApiVersionManagerTest.scala  |  2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  6 ++--
 .../apache/kafka/message/RequestListenerType.java  |  2 --
 73 files changed, 99 insertions(+), 105 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 6afadb8c58e..51510edb5d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -328,11 +328,7 @@ public enum ApiKeys {
         return hasBuffer.get();
     }
 
-    public static EnumSet<ApiKeys> zkBrokerApis() {
-        return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
-    }
-
-    public static EnumSet<ApiKeys> kraftBrokerApis() {
+    public static EnumSet<ApiKeys> brokerApis() {
         return apisForListener(ApiMessageType.ListenerType.BROKER);
     }
 
@@ -342,7 +338,7 @@ public enum ApiKeys {
 
     public static EnumSet<ApiKeys> clientApis() {
         List<ApiKeys> apis = Arrays.stream(ApiKeys.values())
-            .filter(apiKey -> 
apiKey.inScope(ApiMessageType.ListenerType.ZK_BROKER) || 
apiKey.inScope(ApiMessageType.ListenerType.BROKER))
+            .filter(apiKey -> 
apiKey.inScope(ApiMessageType.ListenerType.BROKER))
             .collect(Collectors.toList());
         return EnumSet.copyOf(apis);
     }
diff --git 
a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json 
b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
index 157ae20c0a4..9bebc8366cf 100644
--- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 25,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "AddOffsetsToTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json 
b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
index 2ed84be2180..68a45cdd0ac 100644
--- a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 24,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "AddPartitionsToTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json 
b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
index e8271c60321..9447b080f84 100644
--- a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
+++ b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 67,
   "type": "request",
-  "listeners": ["zkBroker", "controller"],
+  "listeners": ["controller"],
   "name": "AllocateProducerIdsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git 
a/clients/src/main/resources/common/message/AlterClientQuotasRequest.json 
b/clients/src/main/resources/common/message/AlterClientQuotasRequest.json
index 6bfdc925c29..93524a80e21 100644
--- a/clients/src/main/resources/common/message/AlterClientQuotasRequest.json
+++ b/clients/src/main/resources/common/message/AlterClientQuotasRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 49,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "AlterClientQuotasRequest",
   "validVersions": "0-1",
   // Version 1 enables flexible versions.
diff --git a/clients/src/main/resources/common/message/AlterConfigsRequest.json 
b/clients/src/main/resources/common/message/AlterConfigsRequest.json
index 31057e3410a..b87091f2e5c 100644
--- a/clients/src/main/resources/common/message/AlterConfigsRequest.json
+++ b/clients/src/main/resources/common/message/AlterConfigsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 33,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "AlterConfigsRequest",
   // Version 1 is the same as version 0.
   // Version 2 enables flexible versions.
diff --git 
a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
 
b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
index 47043ff02d0..f3047feb0a3 100644
--- 
a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
+++ 
b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 45,
   "type": "request",
-  "listeners": ["broker", "controller", "zkBroker"],
+  "listeners": ["broker", "controller"],
   "name": "AlterPartitionReassignmentsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git 
a/clients/src/main/resources/common/message/AlterPartitionRequest.json 
b/clients/src/main/resources/common/message/AlterPartitionRequest.json
index 954b98578af..d22f3eb13ad 100644
--- a/clients/src/main/resources/common/message/AlterPartitionRequest.json
+++ b/clients/src/main/resources/common/message/AlterPartitionRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 56,
   "type": "request",
-  "listeners": ["zkBroker", "controller"],
+  "listeners": ["controller"],
   "name": "AlterPartitionRequest",
   // Version 1 adds LeaderRecoveryState field (KIP-704).
   //
diff --git 
a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json 
b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json
index b309243fb62..42ef6693325 100644
--- a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json
+++ b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 34,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "AlterReplicaLogDirsRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   //
diff --git 
a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
 
b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
index 8937394ef6e..ea687072f16 100644
--- 
a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
+++ 
b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 51,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "AlterUserScramCredentialsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json 
b/clients/src/main/resources/common/message/ApiVersionsRequest.json
index 050dbcfd3f2..56170c96673 100644
--- a/clients/src/main/resources/common/message/ApiVersionsRequest.json
+++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 18,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "ApiVersionsRequest",
   // Versions 0 through 2 of ApiVersionsRequest are the same.
   //
diff --git a/clients/src/main/resources/common/message/CreateAclsRequest.json 
b/clients/src/main/resources/common/message/CreateAclsRequest.json
index 0f458203074..d3a028b0536 100644
--- a/clients/src/main/resources/common/message/CreateAclsRequest.json
+++ b/clients/src/main/resources/common/message/CreateAclsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 30,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "CreateAclsRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   // Version 1 adds resource pattern type.
diff --git 
a/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json 
b/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
index 276cbe57901..f4c586cbcad 100644
--- 
a/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
+++ 
b/clients/src/main/resources/common/message/CreateDelegationTokenRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 38,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "CreateDelegationTokenRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   //
diff --git 
a/clients/src/main/resources/common/message/CreatePartitionsRequest.json 
b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
index 6e249498659..95552a080b9 100644
--- a/clients/src/main/resources/common/message/CreatePartitionsRequest.json
+++ b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 37,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "CreatePartitionsRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/CreateTopicsRequest.json 
b/clients/src/main/resources/common/message/CreateTopicsRequest.json
index 9aed4d236db..3ee03933572 100644
--- a/clients/src/main/resources/common/message/CreateTopicsRequest.json
+++ b/clients/src/main/resources/common/message/CreateTopicsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 19,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "CreateTopicsRequest",
   // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new 
baseline.
   //
diff --git a/clients/src/main/resources/common/message/DeleteAclsRequest.json 
b/clients/src/main/resources/common/message/DeleteAclsRequest.json
index b430364d861..db605305ae2 100644
--- a/clients/src/main/resources/common/message/DeleteAclsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteAclsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 31,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "DeleteAclsRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   // Version 1 adds the pattern type.
diff --git a/clients/src/main/resources/common/message/DeleteGroupsRequest.json 
b/clients/src/main/resources/common/message/DeleteGroupsRequest.json
index 1ac6a053e63..7d7c4371789 100644
--- a/clients/src/main/resources/common/message/DeleteGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteGroupsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 42,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "DeleteGroupsRequest",
   // Version 1 is the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/DeleteRecordsRequest.json 
b/clients/src/main/resources/common/message/DeleteRecordsRequest.json
index 06a12d85c8b..fc697944a02 100644
--- a/clients/src/main/resources/common/message/DeleteRecordsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteRecordsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 21,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "DeleteRecordsRequest",
   // Version 1 is the same as version 0.
 
diff --git a/clients/src/main/resources/common/message/DeleteTopicsRequest.json 
b/clients/src/main/resources/common/message/DeleteTopicsRequest.json
index 465d9e0b31f..35c9eb28f9c 100644
--- a/clients/src/main/resources/common/message/DeleteTopicsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteTopicsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 20,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "DeleteTopicsRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   // Versions 0, 1, 2, and 3 are the same.
diff --git a/clients/src/main/resources/common/message/DescribeAclsRequest.json 
b/clients/src/main/resources/common/message/DescribeAclsRequest.json
index a9bdfba40e7..23883c154fe 100644
--- a/clients/src/main/resources/common/message/DescribeAclsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeAclsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 29,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "DescribeAclsRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   // Version 1 adds resource pattern type.
diff --git 
a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json 
b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
index d14cfc95733..6644e53343b 100644
--- a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
+++ b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 48,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "DescribeClientQuotasRequest",
   // Version 1 enables flexible versions.
   "validVersions": "0-1",
diff --git 
a/clients/src/main/resources/common/message/DescribeClusterRequest.json 
b/clients/src/main/resources/common/message/DescribeClusterRequest.json
index 71e00df09b2..9c17c6b1ba5 100644
--- a/clients/src/main/resources/common/message/DescribeClusterRequest.json
+++ b/clients/src/main/resources/common/message/DescribeClusterRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 60,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "DescribeClusterRequest",
   //
   // Version 1 adds EndpointType for KIP-919 support.
diff --git 
a/clients/src/main/resources/common/message/DescribeConfigsRequest.json 
b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
index a382d9fecaf..d1a85a67fea 100644
--- a/clients/src/main/resources/common/message/DescribeConfigsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeConfigsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 32,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "DescribeConfigsRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   // Version 1 adds IncludeSynonyms and removes IsDefault.
diff --git 
a/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json 
b/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
index d62eb28a29f..bc29789b689 100644
--- 
a/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
+++ 
b/clients/src/main/resources/common/message/DescribeDelegationTokenRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 41,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "DescribeDelegationTokenRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   // Version 1 is the same as version 0.
diff --git 
a/clients/src/main/resources/common/message/DescribeGroupsRequest.json 
b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
index 8dabf71bd52..cec56852cad 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 15,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "DescribeGroupsRequest",
   // Versions 1 and 2 are the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json 
b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
index 115947ff394..4f3bfa2c58c 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 35,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "DescribeLogDirsRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   // Version 1 is the same as version 0.
diff --git 
a/clients/src/main/resources/common/message/DescribeProducersRequest.json 
b/clients/src/main/resources/common/message/DescribeProducersRequest.json
index 7a54c65622d..b7889ef1f1e 100644
--- a/clients/src/main/resources/common/message/DescribeProducersRequest.json
+++ b/clients/src/main/resources/common/message/DescribeProducersRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 61,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "DescribeProducersRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git 
a/clients/src/main/resources/common/message/DescribeTransactionsRequest.json 
b/clients/src/main/resources/common/message/DescribeTransactionsRequest.json
index 442f11f8b0b..f7349d60cd1 100644
--- a/clients/src/main/resources/common/message/DescribeTransactionsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeTransactionsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 65,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "DescribeTransactionsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git 
a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
 
b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
index 2f7a1112c48..cde4b7cc844 100644
--- 
a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
+++ 
b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 50,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "DescribeUserScramCredentialsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json 
b/clients/src/main/resources/common/message/ElectLeadersRequest.json
index dd9fa216415..bce04585a70 100644
--- a/clients/src/main/resources/common/message/ElectLeadersRequest.json
+++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 43,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "ElectLeadersRequest",
   // Version 1 implements multiple leader election types, as described by 
KIP-460.
   //
diff --git a/clients/src/main/resources/common/message/EndTxnRequest.json 
b/clients/src/main/resources/common/message/EndTxnRequest.json
index 80ac5c5d255..f11c7a3268f 100644
--- a/clients/src/main/resources/common/message/EndTxnRequest.json
+++ b/clients/src/main/resources/common/message/EndTxnRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 26,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "EndTxnRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/EnvelopeRequest.json 
b/clients/src/main/resources/common/message/EnvelopeRequest.json
index a30a50ba684..1f6ff62de8d 100644
--- a/clients/src/main/resources/common/message/EnvelopeRequest.json
+++ b/clients/src/main/resources/common/message/EnvelopeRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 58,
   "type": "request",
-  "listeners": ["controller", "zkBroker"],
+  "listeners": ["controller"],
   "name": "EnvelopeRequest",
   // Request struct for forwarding.
   "validVersions": "0",
diff --git 
a/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json 
b/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
index 2694243f1f3..92a9e4e947f 100644
--- 
a/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
+++ 
b/clients/src/main/resources/common/message/ExpireDelegationTokenRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 40,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "ExpireDelegationTokenRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   // Version 1 is the same as version 0.
diff --git a/clients/src/main/resources/common/message/FetchRequest.json 
b/clients/src/main/resources/common/message/FetchRequest.json
index c49dd1a9b0a..b7ad185f60b 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 1,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "FetchRequest",
   // Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new 
baseline.
   //
diff --git 
a/clients/src/main/resources/common/message/FindCoordinatorRequest.json 
b/clients/src/main/resources/common/message/FindCoordinatorRequest.json
index 7a926501f7b..2807f40c857 100644
--- a/clients/src/main/resources/common/message/FindCoordinatorRequest.json
+++ b/clients/src/main/resources/common/message/FindCoordinatorRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 10,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "FindCoordinatorRequest",
   // Version 1 adds KeyType.
   //
diff --git a/clients/src/main/resources/common/message/HeartbeatRequest.json 
b/clients/src/main/resources/common/message/HeartbeatRequest.json
index dcf776d8ec4..57ef18e9224 100644
--- a/clients/src/main/resources/common/message/HeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/HeartbeatRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 12,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "HeartbeatRequest",
   // Version 1 and version 2 are the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json 
b/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json
index d4955c91b85..d908c28012f 100644
--- 
a/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json
+++ 
b/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 44,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "IncrementalAlterConfigsRequest",
   // Version 1 is the first flexible version.
   "validVersions": "0-1",
diff --git 
a/clients/src/main/resources/common/message/InitProducerIdRequest.json 
b/clients/src/main/resources/common/message/InitProducerIdRequest.json
index 39f546dbc04..f1700f3a3d2 100644
--- a/clients/src/main/resources/common/message/InitProducerIdRequest.json
+++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 22,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "InitProducerIdRequest",
   // Version 1 is the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json 
b/clients/src/main/resources/common/message/JoinGroupRequest.json
index 2c4c9fdfd62..41d7c1acbae 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 11,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "JoinGroupRequest",
   // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new 
baseline.
   //
diff --git a/clients/src/main/resources/common/message/LeaveGroupRequest.json 
b/clients/src/main/resources/common/message/LeaveGroupRequest.json
index fb16f72eb82..929f4fb468c 100644
--- a/clients/src/main/resources/common/message/LeaveGroupRequest.json
+++ b/clients/src/main/resources/common/message/LeaveGroupRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 13,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "LeaveGroupRequest",
   // Version 1 and 2 are the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/ListGroupsRequest.json 
b/clients/src/main/resources/common/message/ListGroupsRequest.json
index 32defaa2033..cbc791e0a5a 100644
--- a/clients/src/main/resources/common/message/ListGroupsRequest.json
+++ b/clients/src/main/resources/common/message/ListGroupsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 16,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "ListGroupsRequest",
   // Version 1 and 2 are the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json 
b/clients/src/main/resources/common/message/ListOffsetsRequest.json
index 5a864d8ddc1..6f8ff7d6cf9 100644
--- a/clients/src/main/resources/common/message/ListOffsetsRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 2,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "ListOffsetsRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   //
diff --git 
a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
 
b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
index 952a3db0d23..428a256ac30 100644
--- 
a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
+++ 
b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 46,
   "type": "request",
-  "listeners": ["broker", "controller", "zkBroker"],
+  "listeners": ["broker", "controller"],
   "name": "ListPartitionReassignmentsRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git 
a/clients/src/main/resources/common/message/ListTransactionsRequest.json 
b/clients/src/main/resources/common/message/ListTransactionsRequest.json
index 4879c4d5f95..5d7c688da22 100644
--- a/clients/src/main/resources/common/message/ListTransactionsRequest.json
+++ b/clients/src/main/resources/common/message/ListTransactionsRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 66,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "ListTransactionsRequest",
   // Version 1: adds DurationFilter to list transactions older than specified 
duration
   "validVersions": "0-1",
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json 
b/clients/src/main/resources/common/message/MetadataRequest.json
index eaee4a3453d..c29093239ed 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 3,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "MetadataRequest",
   "validVersions": "4-13",
   "flexibleVersions": "9+",
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json 
b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index 8f9e1d74d96..348ed2b90c5 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 8,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "OffsetCommitRequest",
   // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new 
baseline.
   //
diff --git a/clients/src/main/resources/common/message/OffsetDeleteRequest.json 
b/clients/src/main/resources/common/message/OffsetDeleteRequest.json
index 4583030060a..1974b67f69f 100644
--- a/clients/src/main/resources/common/message/OffsetDeleteRequest.json
+++ b/clients/src/main/resources/common/message/OffsetDeleteRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 47,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "OffsetDeleteRequest",
   "validVersions": "0",
   "flexibleVersions": "none",
diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json 
b/clients/src/main/resources/common/message/OffsetFetchRequest.json
index d9d97da384b..88f5b568d72 100644
--- a/clients/src/main/resources/common/message/OffsetFetchRequest.json
+++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 9,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "OffsetFetchRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   //
diff --git 
a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json 
b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
index b2126a40014..dd559bc8777 100644
--- a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
+++ b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 23,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "OffsetForLeaderEpochRequest",
   // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new 
baseline.
   //
diff --git a/clients/src/main/resources/common/message/ProduceRequest.json 
b/clients/src/main/resources/common/message/ProduceRequest.json
index 90e46a3041b..db7d961f137 100644
--- a/clients/src/main/resources/common/message/ProduceRequest.json
+++ b/clients/src/main/resources/common/message/ProduceRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 0,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "ProduceRequest",
   // Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new 
baseline.
   //
diff --git 
a/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json 
b/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
index 302e5d3e2ba..5ce0a4775db 100644
--- a/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
+++ b/clients/src/main/resources/common/message/RenewDelegationTokenRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 39,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "RenewDelegationTokenRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   // Version 1 is the same as version 0.
diff --git 
a/clients/src/main/resources/common/message/SaslAuthenticateRequest.json 
b/clients/src/main/resources/common/message/SaslAuthenticateRequest.json
index 3f5558b8120..cdb4247b8a9 100644
--- a/clients/src/main/resources/common/message/SaslAuthenticateRequest.json
+++ b/clients/src/main/resources/common/message/SaslAuthenticateRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 36,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "SaslAuthenticateRequest",
   // Version 1 is the same as version 0.
   // Version 2 adds flexible version support
diff --git 
a/clients/src/main/resources/common/message/SaslHandshakeRequest.json 
b/clients/src/main/resources/common/message/SaslHandshakeRequest.json
index a370a80df39..d2189d826ea 100644
--- a/clients/src/main/resources/common/message/SaslHandshakeRequest.json
+++ b/clients/src/main/resources/common/message/SaslHandshakeRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 17,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "SaslHandshakeRequest",
   // Version 1 supports SASL_AUTHENTICATE.
   // NOTE: Version cannot be easily bumped due to incorrect
diff --git a/clients/src/main/resources/common/message/SyncGroupRequest.json 
b/clients/src/main/resources/common/message/SyncGroupRequest.json
index 55258441383..1b53df27757 100644
--- a/clients/src/main/resources/common/message/SyncGroupRequest.json
+++ b/clients/src/main/resources/common/message/SyncGroupRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 14,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "SyncGroupRequest",
   // Versions 1 and 2 are the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json 
b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
index fd2c34b7490..59a1f05e097 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 28,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "TxnOffsetCommitRequest",
   // Version 1 is the same as version 0.
   //
diff --git 
a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json 
b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
index 8de1eeedd90..e2f9b45d4cb 100644
--- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
+++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 57,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["broker", "controller"],
   "name": "UpdateFeaturesRequest",
   // Version 1 adds validate only field.
   //
diff --git 
a/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json 
b/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json
index 933d009b558..cacda4198e4 100644
--- a/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json
+++ b/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 27,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["broker"],
   "name": "WriteTxnMarkersRequest",
   // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
   //
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index ce7d4d83506..cef48b65bb6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -385,7 +385,7 @@ public class NetworkClientTest {
     private void awaitReady(NetworkClient client, Node node) {
         if (client.discoverBrokerVersions()) {
             
setExpectedApiVersionsResponse(TestUtils.defaultApiVersionsResponse(
-                ApiMessageType.ListenerType.ZK_BROKER));
+                ApiMessageType.ListenerType.BROKER));
         }
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());
@@ -1455,7 +1455,7 @@ public class NetworkClientTest {
     }
 
     private ApiVersionsResponse defaultApiVersionsResponse() {
-        return 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+        return 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER);
     }
 
     private static class TestCallbackHandler implements 
RequestCompletionHandler {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0d2439b3b68..4cd4f048358 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -772,7 +772,7 @@ public class KafkaAdminClientTest {
         if (error == Errors.NONE) {
             return new ApiVersionsResponse.Builder().
                 setApiVersions(ApiVersionsResponse.filterApis(
-                    ApiMessageType.ListenerType.ZK_BROKER, false, false)).
+                    ApiMessageType.ListenerType.BROKER, false, false)).
                 setSupportedFeatures(
                     
convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures())).
                 setFinalizedFeatures(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index 8657dcfc1e9..6505a167a33 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -1902,7 +1902,7 @@ public class FetchRequestManagerTest {
                 MetadataRecoveryStrategy.NONE);
 
         ApiVersionsResponse apiVersionsResponse = 
TestUtils.defaultApiVersionsResponse(
-                400, ApiMessageType.ListenerType.ZK_BROKER);
+                400, ApiMessageType.ListenerType.BROKER);
         ByteBuffer buffer = 
RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, 
ApiKeys.API_VERSIONS.latestVersion(), 0);
 
         selector.delayedReceive(new DelayedReceive(node.idString(), new 
NetworkReceive(node.idString(), buffer)));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index ede973c5f9b..729668f8076 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1888,7 +1888,7 @@ public class FetcherTest {
                 MetadataRecoveryStrategy.NONE);
 
         ApiVersionsResponse apiVersionsResponse = 
TestUtils.defaultApiVersionsResponse(
-            400, ApiMessageType.ListenerType.ZK_BROKER);
+            400, ApiMessageType.ListenerType.BROKER);
         ByteBuffer buffer = 
RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, 
ApiKeys.API_VERSIONS.latestVersion(), 0);
 
         selector.delayedReceive(new DelayedReceive(node.idString(), new 
NetworkReceive(node.idString(), buffer)));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index c4a24e287fb..25b9333da5b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -221,7 +221,7 @@ public class SenderTest {
                 MetadataRecoveryStrategy.NONE);
 
         ApiVersionsResponse apiVersionsResponse = 
TestUtils.defaultApiVersionsResponse(
-            400, ApiMessageType.ListenerType.ZK_BROKER);
+            400, ApiMessageType.ListenerType.BROKER);
         ByteBuffer buffer = 
RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, 
ApiKeys.API_VERSIONS.latestVersion(), 0);
 
         selector.delayedReceive(new DelayedReceive(node.idString(), new 
NetworkReceive(node.idString(), buffer)));
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 9bdc7c38c44..90dd34bb078 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -129,7 +129,7 @@ public final class NioEchoServer extends Thread {
             if (channelBuilder == null)
                 channelBuilder = 
ChannelBuilders.serverChannelBuilder(listenerName, false,
                         securityProtocol, config, credentialCache, tokenCache, 
time, logContext,
-                        version -> 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
+                        version -> 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER));
             this.metrics = new Metrics();
             this.selector = new Selector(10000, failedAuthenticationDelayMs, 
metrics, time,
                     "MetricGroup", channelBuilder, logContext);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index eae6fb0b0a0..3366a46e069 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -179,7 +179,7 @@ public class SaslChannelBuilderTest {
     }
 
     private Function<Short, ApiVersionsResponse> defaultApiVersionsSupplier() {
-        return version -> 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+        return version -> 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER);
     }
 
     private SaslChannelBuilder createChannelBuilder(SecurityProtocol 
securityProtocol, String saslMechanism) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 8a5bf6fdad4..9208171d1a9 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -1356,7 +1356,7 @@ public class SslTransportLayerTest {
     }
 
     private Function<Short, ApiVersionsResponse> defaultApiVersionsSupplier() {
-        return version -> 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+        return version -> 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER);
     }
 
     static class TestSslChannelBuilder extends SslChannelBuilder {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index f87ad0fbf54..dd8b8144a29 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -33,9 +33,11 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -99,7 +101,7 @@ public class ApiVersionsResponseTest {
         );
 
         ApiVersionCollection commonResponse = 
ApiVersionsResponse.intersectForwardableApis(
-            ApiMessageType.ListenerType.ZK_BROKER,
+            ApiMessageType.ListenerType.BROKER,
             activeControllerApiVersions,
             true,
             false
@@ -111,20 +113,19 @@ public class ApiVersionsResponseTest {
             ApiKeys.JOIN_GROUP.latestVersion(), commonResponse);
     }
 
-    @ParameterizedTest
-    @EnumSource(names = {"ZK_BROKER", "BROKER"})
-    public void 
shouldReturnAllKeysWhenThrottleMsIsDefaultThrottle(ListenerType listenerType) {
+    @Test
+    public void shouldReturnAllKeysWhenThrottleMsIsDefaultThrottle() {
         ApiVersionsResponse response = new ApiVersionsResponse.Builder().
             setThrottleTimeMs(AbstractResponse.DEFAULT_THROTTLE_TIME).
             setApiVersions(ApiVersionsResponse.filterApis(
-                listenerType,
+                ListenerType.BROKER,
                 true,
                 true)).
             setSupportedFeatures(Features.emptySupportedFeatures()).
             setFinalizedFeatures(Collections.emptyMap()).
             
setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH).
             build();
-        assertEquals(new HashSet<>(ApiKeys.apisForListener(listenerType)), 
apiKeysInResponse(response));
+        assertEquals(new 
HashSet<>(ApiKeys.apisForListener(ListenerType.BROKER)), 
apiKeysInResponse(response));
         assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, 
response.throttleTimeMs());
         assertTrue(response.data().supportedFeatures().isEmpty());
         assertTrue(response.data().finalizedFeatures().isEmpty());
@@ -160,25 +161,30 @@ public class ApiVersionsResponseTest {
             build();
         verifyApiKeysForTelemetry(response, 0);
     }
-
+    
     @Test
-    public void testMetadataQuorumApisAreDisabled() {
+    public void testBrokerApisAreEnabled() {
         ApiVersionsResponse response = new ApiVersionsResponse.Builder().
             setThrottleTimeMs(AbstractResponse.DEFAULT_THROTTLE_TIME).
             setApiVersions(ApiVersionsResponse.filterApis(
-                ListenerType.ZK_BROKER,
+                ListenerType.BROKER,
                 true,
                 true)).
             setSupportedFeatures(Features.emptySupportedFeatures()).
             setFinalizedFeatures(Collections.emptyMap()).
             
setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH).
             build();
-        // Ensure that APIs needed for the KRaft mode are not exposed through 
ApiVersions until we are ready for them
-        HashSet<ApiKeys> exposedApis = apiKeysInResponse(response);
-        assertFalse(exposedApis.contains(ApiKeys.VOTE));
-        assertFalse(exposedApis.contains(ApiKeys.BEGIN_QUORUM_EPOCH));
-        assertFalse(exposedApis.contains(ApiKeys.END_QUORUM_EPOCH));
-        assertFalse(exposedApis.contains(ApiKeys.DESCRIBE_QUORUM));
+
+        Set<ApiKeys> exposed = apiKeysInResponse(response);
+
+
+        Arrays.stream(ApiKeys.values())
+            .filter(key -> 
key.messageType.listeners().contains(ListenerType.BROKER))
+            .forEach(key -> assertTrue(exposed.contains(key)));
+        Arrays.stream(ApiKeys.values())
+            .filter(key -> key.messageType.listeners()
+                .stream().noneMatch(listener -> listener == 
ListenerType.BROKER))
+            .forEach(key -> assertFalse(exposed.contains(key)));
     }
 
     @Test
@@ -251,12 +257,6 @@ public class ApiVersionsResponseTest {
         assertEquals(expectedVersionsForForwardableAPI, 
commonResponse.find(forwardableAPIKey));
     }
 
-    private void verifyApiKeysForMagic(ApiVersionsResponse response, Byte 
maxMagic) {
-        for (ApiVersion version : response.data().apiKeys()) {
-            
assertTrue(ApiKeys.forId(version.apiKey()).minRequiredInterBrokerMagic <= 
maxMagic);
-        }
-    }
-
     private void verifyApiKeysForTelemetry(ApiVersionsResponse response, int 
expectedCount) {
         int count = 0;
         for (ApiVersion version : response.data().apiKeys()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9e708a60d75..c6be52fb189 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -401,7 +401,7 @@ public class RequestResponseTest {
     public void testApiVersionsSerialization() {
         for (short version : API_VERSIONS.allVersions()) {
             checkErrorResponse(createApiVersionRequest(version), new 
UnsupportedVersionException("Not Supported"));
-            
checkResponse(TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER),
 version);
+            
checkResponse(TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER),
 version);
         }
     }
 
@@ -834,7 +834,7 @@ public class RequestResponseTest {
     }
 
     private ApiVersionsResponse defaultApiVersionsResponse() {
-        return 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+        return 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER);
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 3b1e54dee2c..8261c90014c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -1107,7 +1107,7 @@ public class SaslAuthenticatorTest {
 
     /**
      * Test that callback handlers are only applied to connections for the 
mechanisms
-     * configured for the handler. Test enables two mechanisms 'PLAIN` and 
`DIGEST-MD5`
+     * configured for the handler. Test enables two mechanisms `PLAIN` and 
`DIGEST-MD5`
      * on the servers with different callback handlers for the two mechanisms. 
Verifies
      * that clients using both mechanisms authenticate successfully.
      */
@@ -1980,7 +1980,7 @@ public class SaslAuthenticatorTest {
 
         Function<Short, ApiVersionsResponse> apiVersionSupplier = version -> {
             ApiVersionsResponse defaultApiVersionResponse = 
TestUtils.defaultApiVersionsResponse(
-                ApiMessageType.ListenerType.ZK_BROKER);
+                ApiMessageType.ListenerType.BROKER);
             ApiVersionCollection apiVersions = new ApiVersionCollection();
             for (ApiVersion apiVersion : 
defaultApiVersionResponse.data().apiKeys()) {
                 if (apiVersion.apiKey() != ApiKeys.SASL_AUTHENTICATE.id) {
@@ -2574,7 +2574,7 @@ public class SaslAuthenticatorTest {
                 DelegationTokenCache tokenCache, Time time) {
             super(connectionMode, jaasContexts, securityProtocol, 
listenerName, isInterBrokerListener, clientSaslMechanism,
                 credentialCache, tokenCache, null, time, new LogContext(),
-                version -> 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
+                version -> 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER));
         }
 
         @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 81df34f85f4..0cc6f6f94b4 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -401,7 +401,7 @@ public class SaslServerAuthenticatorTest {
         Map<String, AuthenticateCallbackHandler> callbackHandlers = 
Collections.singletonMap(
                 mechanism, new SaslServerCallbackHandler());
         ApiVersionsResponse apiVersionsResponse = 
TestUtils.defaultApiVersionsResponse(
-                ApiMessageType.ListenerType.ZK_BROKER);
+                ApiMessageType.ListenerType.BROKER);
         Map<String, Long> connectionsMaxReauthMsByMechanism = maxReauth != 
null ?
                 Collections.singletonMap(mechanism, maxReauth) : 
Collections.emptyMap();
 
diff --git 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 667b5523336..aa202c0df10 100644
--- 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -270,7 +270,7 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
     val jaasContexts = Collections.singletonMap("GSSAPI", 
JaasContext.loadClientContext(config.values()))
     val channelBuilder = new SaslChannelBuilder(ConnectionMode.CLIENT, 
jaasContexts, securityProtocol,
       null, false, kafkaClientSaslMechanism, null, null, null, time, new 
LogContext(),
-      _ => 
org.apache.kafka.test.TestUtils.defaultApiVersionsResponse(ListenerType.ZK_BROKER))
 {
+      _ => 
org.apache.kafka.test.TestUtils.defaultApiVersionsResponse(ListenerType.BROKER))
 {
       override protected def defaultLoginClass(): Class[_ <: Login] = 
classOf[TestableKerberosLogin]
     }
     channelBuilder.configure(config.values())
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
index 341c859bf32..1d506b3b7c2 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
@@ -86,7 +86,7 @@ class ApiVersionManagerTest {
     )))
 
     val versionManager = new DefaultApiVersionManager(
-      listenerType = ListenerType.ZK_BROKER,
+      listenerType = ListenerType.BROKER,
       forwardingManager = forwardingManager,
       brokerFeatures = brokerFeatures,
       metadataCache = metadataCache,
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 40260e62613..0df8fcb7092 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -183,7 +183,7 @@ class RequestQuotaTest extends BaseRequestTest {
   def testUnauthorizedThrottle(quorum: String): Unit = {
     RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
 
-    val apiKeys = ApiKeys.kraftBrokerApis
+    val apiKeys = ApiKeys.brokerApis
     for (apiKey <- apiKeys.asScala.toSet -- RequestQuotaTest.Envelope) {
       submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
     }
@@ -192,11 +192,11 @@ class RequestQuotaTest extends BaseRequestTest {
   }
 
   private def clientActions: Set[ApiKeys] = {
-    ApiKeys.kraftBrokerApis.asScala.toSet -- clusterActions -- 
RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope
+    ApiKeys.brokerApis.asScala.toSet -- clusterActions -- 
RequestQuotaTest.SaslActions -- RequestQuotaTest.Envelope
   }
 
   private def clusterActions: Set[ApiKeys] = {
-    ApiKeys.kraftBrokerApis.asScala.filter(_.clusterAction).toSet
+    ApiKeys.brokerApis.asScala.filter(_.clusterAction).toSet
   }
 
   private def clusterActionsWithThrottleForBroker: Set[ApiKeys] = {
diff --git 
a/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java 
b/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java
index 729da21f4c4..ac3bfca480b 100644
--- a/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java
+++ b/generator/src/main/java/org/apache/kafka/message/RequestListenerType.java
@@ -19,8 +19,6 @@ package org.apache.kafka.message;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 public enum RequestListenerType {
-    @JsonProperty("zkBroker")
-    ZK_BROKER,
 
     @JsonProperty("broker")
     BROKER,

Reply via email to