This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push: new eac1b31 KAFKA-8985; Add flexible version support to inter-broker APIs (#7453) eac1b31 is described below commit eac1b3140510d81d50d96ef118a2df7126019caf Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Mon Oct 7 09:21:14 2019 -0700 KAFKA-8985; Add flexible version support to inter-broker APIs (#7453) This patch adds flexible version support for the following inter-broker APIs: ControlledShutdown, LeaderAndIsr, UpdateMetadata, and StopReplica. Version checks have been removed from `getErrorResponse` methods since they were redundant given the checks in `AbstractRequest` and the respective`*Data` types. Reviewers: Ismael Juma <ism...@juma.me.uk> --- .../common/requests/ControlledShutdownRequest.java | 8 ++-- .../kafka/common/requests/LeaderAndIsrRequest.java | 13 +----- .../kafka/common/requests/StopReplicaRequest.java | 11 +---- .../common/requests/UpdateMetadataRequest.java | 9 ++-- .../common/message/ControlledShutdownRequest.json | 4 +- .../common/message/ControlledShutdownResponse.json | 4 +- .../common/message/LeaderAndIsrRequest.json | 4 +- .../common/message/LeaderAndIsrResponse.json | 4 +- .../common/message/StopReplicaRequest.json | 4 +- .../common/message/StopReplicaResponse.json | 4 +- .../common/message/UpdateMetadataRequest.json | 4 +- .../common/message/UpdateMetadataResponse.json | 4 +- .../requests/ControlledShutdownRequestTest.java | 51 ++++++++++++++++++++++ .../common/requests/LeaderAndIsrRequestTest.java | 24 ++++++++++ .../common/requests/StopReplicaRequestTest.java | 27 ++++++++++++ .../common/requests/UpdateMetadataRequestTest.java | 24 ++++++++++ core/src/main/scala/kafka/api/ApiVersion.scala | 11 ++++- .../controller/ControllerChannelManager.scala | 9 ++-- core/src/main/scala/kafka/server/KafkaServer.scala | 5 ++- .../test/scala/unit/kafka/api/ApiVersionTest.scala | 3 +- .../controller/ControllerChannelManagerTest.scala | 12 +++-- 21 files changed, 180 insertions(+), 59 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java index 83e9444..7123845 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; - public class ControlledShutdownRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder<ControlledShutdownRequest> { @@ -63,9 +62,10 @@ public class ControlledShutdownRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new ControlledShutdownResponse(new ControlledShutdownResponseData(). - setErrorCode(Errors.forException(e).code())); + public ControlledShutdownResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ControlledShutdownResponseData data = new ControlledShutdownResponseData() + .setErrorCode(Errors.forException(e).code()); + return new ControlledShutdownResponse(data); } public static ControlledShutdownRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 63013ff..270069a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -146,18 +146,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { .setErrorCode(error.code())); } responseData.setPartitionErrors(partitions); - - short versionId = version(); - switch (versionId) { - case 0: - case 1: - case 2: - case 3: - return new LeaderAndIsrResponse(responseData); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.LEADER_AND_ISR.latestVersion())); - } + return new LeaderAndIsrResponse(responseData); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java index 425f66a..0f60e20 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java @@ -116,16 +116,7 @@ public class StopReplicaRequest extends AbstractControlRequest { .setErrorCode(error.code())); } data.setPartitionErrors(partitions); - - short versionId = version(); - switch (versionId) { - case 0: - case 1: - return new StopReplicaResponse(data); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.STOP_REPLICA.latestVersion())); - } + return new StopReplicaResponse(data); } public boolean deletePartitions() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 90dbff3..49f962d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -187,12 +187,9 @@ public class UpdateMetadataRequest extends AbstractControlRequest { @Override public UpdateMetadataResponse getErrorResponse(int throttleTimeMs, Throwable e) { - short version = version(); - if (version <= 5) - return new UpdateMetadataResponse(new UpdateMetadataResponseData().setErrorCode(Errors.forException(e).code())); - else - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - version, this.getClass().getSimpleName(), ApiKeys.UPDATE_METADATA.latestVersion())); + UpdateMetadataResponseData data = new UpdateMetadataResponseData() + .setErrorCode(Errors.forException(e).code()); + return new UpdateMetadataResponse(data); } public Iterable<UpdateMetadataPartitionState> partitionStates() { diff --git a/clients/src/main/resources/common/message/ControlledShutdownRequest.json b/clients/src/main/resources/common/message/ControlledShutdownRequest.json index 6592e78..5756d1c 100644 --- a/clients/src/main/resources/common/message/ControlledShutdownRequest.json +++ b/clients/src/main/resources/common/message/ControlledShutdownRequest.json @@ -24,8 +24,8 @@ // Version 1 is the same as version 0. // // Version 2 adds BrokerEpoch. - "validVersions": "0-2", - "flexibleVersions": "none", + "validVersions": "0-3", + "flexibleVersions": "3+", "fields": [ { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The id of the broker for which controlled shutdown has been requested." }, diff --git a/clients/src/main/resources/common/message/ControlledShutdownResponse.json b/clients/src/main/resources/common/message/ControlledShutdownResponse.json index dc61d70..27feb1b 100644 --- a/clients/src/main/resources/common/message/ControlledShutdownResponse.json +++ b/clients/src/main/resources/common/message/ControlledShutdownResponse.json @@ -18,8 +18,8 @@ "type": "response", "name": "ControlledShutdownResponse", // Versions 1 and 2 are the same as version 0. - "validVersions": "0-2", - "flexibleVersions": "none", + "validVersions": "0-3", + "flexibleVersions": "3+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code." }, diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json index 8af1df5..8529688 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json @@ -22,8 +22,8 @@ // Version 2 adds broker epoch and reorganizes the partitions by topic. // // Version 3 adds AddingReplicas and RemovingReplicas - "validVersions": "0-3", - "flexibleVersions": "none", + "validVersions": "0-4", + "flexibleVersions": "4+", "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The current controller ID." }, diff --git a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json index 3b67c4c..10c3cd9 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json @@ -22,8 +22,8 @@ // Version 2 is the same as version 1. // // Version 3 is the same as version 2. - "validVersions": "0-3", - "flexibleVersions": "none", + "validVersions": "0-4", + "flexibleVersions": "4+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, diff --git a/clients/src/main/resources/common/message/StopReplicaRequest.json b/clients/src/main/resources/common/message/StopReplicaRequest.json index 695c0c1..5c13705 100644 --- a/clients/src/main/resources/common/message/StopReplicaRequest.json +++ b/clients/src/main/resources/common/message/StopReplicaRequest.json @@ -19,8 +19,8 @@ "name": "StopReplicaRequest", // Version 1 adds the broker epoch and reorganizes the partitions to be stored // per topic. - "validVersions": "0-1", - "flexibleVersions": "none", + "validVersions": "0-2", + "flexibleVersions": "2+", "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The controller id." }, diff --git a/clients/src/main/resources/common/message/StopReplicaResponse.json b/clients/src/main/resources/common/message/StopReplicaResponse.json index d55a7b5..d864e91 100644 --- a/clients/src/main/resources/common/message/StopReplicaResponse.json +++ b/clients/src/main/resources/common/message/StopReplicaResponse.json @@ -18,8 +18,8 @@ "type": "response", "name": "StopReplicaResponse", // Version 1 is the same as version 0. - "validVersions": "0-1", - "flexibleVersions": "none", + "validVersions": "0-2", + "flexibleVersions": "2+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code, or 0 if there was no top-level error." }, diff --git a/clients/src/main/resources/common/message/UpdateMetadataRequest.json b/clients/src/main/resources/common/message/UpdateMetadataRequest.json index 203ee44..3c45f83 100644 --- a/clients/src/main/resources/common/message/UpdateMetadataRequest.json +++ b/clients/src/main/resources/common/message/UpdateMetadataRequest.json @@ -26,8 +26,8 @@ // Version 4 adds the offline replica list. // // Version 5 adds the broker epoch field and normalizes partitions by topic. - "validVersions": "0-5", - "flexibleVersions": "none", + "validVersions": "0-6", + "flexibleVersions": "6+", "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The controller id." }, diff --git a/clients/src/main/resources/common/message/UpdateMetadataResponse.json b/clients/src/main/resources/common/message/UpdateMetadataResponse.json index 5990beb..aaebed0 100644 --- a/clients/src/main/resources/common/message/UpdateMetadataResponse.json +++ b/clients/src/main/resources/common/message/UpdateMetadataResponse.json @@ -18,8 +18,8 @@ "type": "response", "name": "UpdateMetadataResponse", // Versions 1, 2, 3, 4, and 5 are the same as version 0 - "validVersions": "0-5", - "flexibleVersions": "none", + "validVersions": "0-6", + "flexibleVersions": "6+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ControlledShutdownRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ControlledShutdownRequestTest.java new file mode 100644 index 0000000..b7ec3da --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/ControlledShutdownRequestTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.ControlledShutdownRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.Test; + +import static org.apache.kafka.common.protocol.ApiKeys.CONTROLLED_SHUTDOWN; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +public class ControlledShutdownRequestTest { + + @Test + public void testUnsupportedVersion() { + ControlledShutdownRequest.Builder builder = new ControlledShutdownRequest.Builder( + new ControlledShutdownRequestData().setBrokerId(1), + (short) (CONTROLLED_SHUTDOWN.latestVersion() + 1)); + assertThrows(UnsupportedVersionException.class, builder::build); + } + + @Test + public void testGetErrorResponse() { + for (short version = CONTROLLED_SHUTDOWN.oldestVersion(); version < CONTROLLED_SHUTDOWN.latestVersion(); version++) { + ControlledShutdownRequest.Builder builder = new ControlledShutdownRequest.Builder( + new ControlledShutdownRequestData().setBrokerId(1), version); + ControlledShutdownRequest request = builder.build(); + ControlledShutdownResponse response = request.getErrorResponse(0, + new ClusterAuthorizationException("Not authorized")); + assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, response.error()); + } + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java index 6ad6197..2235a8f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java @@ -18,10 +18,13 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.LeaderAndIsrRequestData; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrLiveLeader; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState; import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.MessageTestUtil; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -39,10 +42,31 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class LeaderAndIsrRequestTest { + @Test + public void testUnsupportedVersion() { + LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder( + (short) (LEADER_AND_ISR.latestVersion() + 1), 0, 0, 0, + Collections.emptyList(), Collections.emptySet()); + assertThrows(UnsupportedVersionException.class, builder::build); + } + + @Test + public void testGetErrorResponse() { + for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { + LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(version, 0, 0, 0, + Collections.emptyList(), Collections.emptySet()); + LeaderAndIsrRequest request = builder.build(); + LeaderAndIsrResponse response = request.getErrorResponse(0, + new ClusterAuthorizationException("Not authorized")); + assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, response.error()); + } + } + /** * Verifies the logic we have in LeaderAndIsrRequest to present a unified interface across the various versions * works correctly. For example, `LeaderAndIsrPartitionState.topicName` is not serialiazed/deserialized in diff --git a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java index a143ff3..5bb4998 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java @@ -17,17 +17,44 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.MessageTestUtil; import org.apache.kafka.test.TestUtils; import org.junit.Test; +import java.util.Collections; import java.util.Set; +import static org.apache.kafka.common.protocol.ApiKeys.STOP_REPLICA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class StopReplicaRequestTest { @Test + public void testUnsupportedVersion() { + StopReplicaRequest.Builder builder = new StopReplicaRequest.Builder( + (short) (STOP_REPLICA.latestVersion() + 1), + 0, 0, 0L, false, Collections.emptyList()); + assertThrows(UnsupportedVersionException.class, builder::build); + } + + @Test + public void testGetErrorResponse() { + for (short version = STOP_REPLICA.oldestVersion(); version < STOP_REPLICA.latestVersion(); version++) { + StopReplicaRequest.Builder builder = new StopReplicaRequest.Builder(version, + 0, 0, 0L, false, Collections.emptyList()); + StopReplicaRequest request = builder.build(); + StopReplicaResponse response = request.getErrorResponse(0, + new ClusterAuthorizationException("Not authorized")); + assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, response.error()); + } + } + + @Test public void testStopReplicaRequestNormalization() { Set<TopicPartition> tps = TestUtils.generateRandomTopicPartitions(10, 10); StopReplicaRequest.Builder builder = new StopReplicaRequest.Builder((short) 5, 0, 0, 0, false, tps); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java index a5d0d99..fe688ce 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java @@ -17,12 +17,15 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.UpdateMetadataRequestData; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.MessageTestUtil; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.test.TestUtils; @@ -41,10 +44,31 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static org.apache.kafka.common.protocol.ApiKeys.UPDATE_METADATA; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class UpdateMetadataRequestTest { + @Test + public void testUnsupportedVersion() { + UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder( + (short) (UPDATE_METADATA.latestVersion() + 1), 0, 0, 0, + Collections.emptyList(), Collections.emptyList()); + assertThrows(UnsupportedVersionException.class, builder::build); + } + + @Test + public void testGetErrorResponse() { + for (short version = UPDATE_METADATA.oldestVersion(); version < UPDATE_METADATA.latestVersion(); version++) { + UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder( + version, 0, 0, 0, Collections.emptyList(), Collections.emptyList()); + UpdateMetadataRequest request = builder.build(); + UpdateMetadataResponse response = request.getErrorResponse(0, + new ClusterAuthorizationException("Not authorized")); + assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, response.error()); + } + } + /** * Verifies the logic we have in UpdateMetadataRequest to present a unified interface across the various versions * works correctly. For example, `UpdateMetadataPartitionState.topicName` is not serialiazed/deserialized in diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 1a33625..90ffb10 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -92,7 +92,9 @@ object ApiVersion { // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest KAFKA_2_3_IV1, // Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest - KAFKA_2_4_IV0 + KAFKA_2_4_IV0, + // Flexible version support in inter-broker APIs + KAFKA_2_4_IV1 ) // Map keys are the union of the short and full versions @@ -325,6 +327,13 @@ case object KAFKA_2_4_IV0 extends DefaultApiVersion { val id: Int = 24 } +case object KAFKA_2_4_IV1 extends DefaultApiVersion { + val shortVersion: String = "2.4" + val subVersion = "IV1" + val recordVersion = RecordVersion.V2 + val id: Int = 25 +} + object ApiVersionValidator extends Validator { override def ensureValid(name: String, value: Any): Unit = { diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 6c7450a..c60130b 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -447,7 +447,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, private def sendLeaderAndIsrRequest(controllerEpoch: Int, stateChangeLog: StateChangeLogger): Unit = { val leaderAndIsrRequestVersion: Short = - if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV0) 3 + if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4 + else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV0) 3 else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2 else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1 else 0 @@ -483,7 +484,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, val partitionStates = updateMetadataRequestPartitionInfoMap.values.toBuffer val updateMetadataRequestVersion: Short = - if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 5 + if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 6 + else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 5 else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 4 else if (config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3 else if (config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2 @@ -528,7 +530,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, private def sendStopReplicaRequests(controllerEpoch: Int): Unit = { val stopReplicaRequestVersion: Short = - if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1 + if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 2 + else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1 else 0 def stopReplicaPartitionDeleteResponseCallback(brokerId: Int)(response: AbstractResponse): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 57cb0b6..de0ee22 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -24,7 +24,7 @@ import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import com.yammer.metrics.core.Gauge -import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0} +import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1} import kafka.cluster.Broker import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentBrokerMetadataException, InconsistentClusterIdException} import kafka.controller.KafkaController @@ -528,7 +528,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP val controlledShutdownApiVersion: Short = if (config.interBrokerProtocolVersion < KAFKA_0_9_0) 0 else if (config.interBrokerProtocolVersion < KAFKA_2_2_IV0) 1 - else 2 + else if (config.interBrokerProtocolVersion < KAFKA_2_4_IV1) 2 + else 3 val controlledShutdownRequest = new ControlledShutdownRequest.Builder( new ControlledShutdownRequestData() diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala index dadef1d..0519ea0 100644 --- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala +++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala @@ -93,8 +93,9 @@ class ApiVersionTest { assertEquals(KAFKA_2_3_IV0, ApiVersion("2.3-IV0")) assertEquals(KAFKA_2_3_IV1, ApiVersion("2.3-IV1")) - assertEquals(KAFKA_2_4_IV0, ApiVersion("2.4")) + assertEquals(KAFKA_2_4_IV1, ApiVersion("2.4")) assertEquals(KAFKA_2_4_IV0, ApiVersion("2.4-IV0")) + assertEquals(KAFKA_2_4_IV1, ApiVersion("2.4-IV1")) } @Test diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala index e603b25..88607f2 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala @@ -18,7 +18,7 @@ package kafka.controller import java.util.Properties -import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_0_10_2_IV0, KAFKA_0_9_0, KAFKA_1_0_IV0, KAFKA_2_2_IV0, KAFKA_2_4_IV0, LeaderAndIsr} +import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_0_10_2_IV0, KAFKA_0_9_0, KAFKA_1_0_IV0, KAFKA_2_2_IV0, KAFKA_2_4_IV0, KAFKA_2_4_IV1, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} import kafka.server.KafkaConfig import kafka.utils.TestUtils @@ -157,7 +157,8 @@ class ControllerChannelManagerTest { for (apiVersion <- ApiVersion.allVersions) { val leaderAndIsrRequestVersion: Short = - if (apiVersion >= KAFKA_2_4_IV0) 3 + if (apiVersion >= KAFKA_2_4_IV1) 4 + else if (apiVersion >= KAFKA_2_4_IV0) 3 else if (apiVersion >= KAFKA_2_2_IV0) 2 else if (apiVersion >= KAFKA_1_0_IV0) 1 else 0 @@ -326,7 +327,8 @@ class ControllerChannelManagerTest { for (apiVersion <- ApiVersion.allVersions) { val updateMetadataRequestVersion: Short = - if (apiVersion >= KAFKA_2_2_IV0) 5 + if (apiVersion >= KAFKA_2_4_IV1) 6 + else if (apiVersion >= KAFKA_2_2_IV0) 5 else if (apiVersion >= KAFKA_1_0_IV0) 4 else if (apiVersion >= KAFKA_0_10_2_IV0) 3 else if (apiVersion >= KAFKA_0_10_0_IV1) 2 @@ -597,8 +599,10 @@ class ControllerChannelManagerTest { for (apiVersion <- ApiVersion.allVersions) { if (apiVersion < KAFKA_2_2_IV0) testStopReplicaFollowsInterBrokerProtocolVersion(apiVersion, 0.toShort) - else + else if (apiVersion < KAFKA_2_4_IV1) testStopReplicaFollowsInterBrokerProtocolVersion(apiVersion, 1.toShort) + else + testStopReplicaFollowsInterBrokerProtocolVersion(apiVersion, 2.toShort) } }