This is an automated email from the ASF dual-hosted git repository.
dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 594156e01bb KAFKA-15287: Change NodeApiVersions.create() to support
both zk and kraft (#14185)
594156e01bb is described below
commit 594156e01bb59aab4d6dcbcb62679df9d83f8849
Author: vveicc <[email protected]>
AuthorDate: Fri Aug 11 10:18:13 2023 +0800
KAFKA-15287: Change NodeApiVersions.create() to support both zk and kraft
(#14185)
Reviewers: dengziming <[email protected]>
---
.../src/main/java/org/apache/kafka/clients/NodeApiVersions.java | 2 +-
.../test/java/org/apache/kafka/common/protocol/ApiKeysTest.java | 2 +-
.../apache/kafka/common/requests/ApiVersionsResponseTest.java | 9 +++++----
3 files changed, 7 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index aa6ba5a793a..83722f83d02 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -68,7 +68,7 @@ public class NodeApiVersions {
*/
public static NodeApiVersions create(Collection<ApiVersion> overrides) {
List<ApiVersion> apiVersions = new LinkedList<>(overrides);
- for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
+ for (ApiKeys apiKey : ApiKeys.clientApis()) {
boolean exists = false;
for (ApiVersion apiVersion : apiVersions) {
if (apiVersion.apiKey() == apiKey.id) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index 1aa420b36f0..b9fc6e57ec3 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -63,7 +63,7 @@ public class ApiKeysTest {
Set<ApiKeys> authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE,
ApiKeys.SASL_AUTHENTICATE);
// Newer protocol apis include throttle time ms even for cluster
actions
Set<ApiKeys> clusterActionsWithThrottleTimeMs =
EnumSet.of(ApiKeys.ALTER_PARTITION, ApiKeys.ALLOCATE_PRODUCER_IDS,
ApiKeys.UPDATE_FEATURES);
- for (ApiKeys apiKey: ApiKeys.zkBrokerApis()) {
+ for (ApiKeys apiKey: ApiKeys.clientApis()) {
Schema responseSchema =
apiKey.messageType.responseSchemas()[apiKey.latestVersion()];
BoundField throttleTimeField =
responseSchema.get("throttle_time_ms");
if ((apiKey.clusterAction &&
!clusterActionsWithThrottleTimeMs.contains(apiKey))
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index 1e5f8493f60..19d3c468186 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -162,8 +162,9 @@ public class ApiVersionsResponseTest {
assertEquals(10, response.data().finalizedFeaturesEpoch());
}
- @Test
- public void
shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
+ @ParameterizedTest
+ @EnumSource(names = {"ZK_BROKER", "BROKER"})
+ public void
shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle(ListenerType
listenerType) {
ApiVersionsResponse response =
ApiVersionsResponse.createApiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordVersion.current(),
@@ -171,11 +172,11 @@ public class ApiVersionsResponseTest {
Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
- ListenerType.ZK_BROKER,
+ listenerType,
true,
false
);
- assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()),
apiKeysInResponse(response));
+ assertEquals(new HashSet<>(ApiKeys.apisForListener(listenerType)),
apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME,
response.throttleTimeMs());
assertTrue(response.data().supportedFeatures().isEmpty());
assertTrue(response.data().finalizedFeatures().isEmpty());