This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new eac1b31  KAFKA-8985; Add flexible version support to inter-broker APIs 
(#7453)
eac1b31 is described below

commit eac1b3140510d81d50d96ef118a2df7126019caf
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Oct 7 09:21:14 2019 -0700

    KAFKA-8985; Add flexible version support to inter-broker APIs (#7453)
    
    This patch adds flexible version support for the following inter-broker 
APIs: ControlledShutdown, LeaderAndIsr, UpdateMetadata, and StopReplica. 
Version checks have been removed from `getErrorResponse` methods since they 
were redundant given the checks in `AbstractRequest` and the respective`*Data` 
types.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>
---
 .../common/requests/ControlledShutdownRequest.java |  8 ++--
 .../kafka/common/requests/LeaderAndIsrRequest.java | 13 +-----
 .../kafka/common/requests/StopReplicaRequest.java  | 11 +----
 .../common/requests/UpdateMetadataRequest.java     |  9 ++--
 .../common/message/ControlledShutdownRequest.json  |  4 +-
 .../common/message/ControlledShutdownResponse.json |  4 +-
 .../common/message/LeaderAndIsrRequest.json        |  4 +-
 .../common/message/LeaderAndIsrResponse.json       |  4 +-
 .../common/message/StopReplicaRequest.json         |  4 +-
 .../common/message/StopReplicaResponse.json        |  4 +-
 .../common/message/UpdateMetadataRequest.json      |  4 +-
 .../common/message/UpdateMetadataResponse.json     |  4 +-
 .../requests/ControlledShutdownRequestTest.java    | 51 ++++++++++++++++++++++
 .../common/requests/LeaderAndIsrRequestTest.java   | 24 ++++++++++
 .../common/requests/StopReplicaRequestTest.java    | 27 ++++++++++++
 .../common/requests/UpdateMetadataRequestTest.java | 24 ++++++++++
 core/src/main/scala/kafka/api/ApiVersion.scala     | 11 ++++-
 .../controller/ControllerChannelManager.scala      |  9 ++--
 core/src/main/scala/kafka/server/KafkaServer.scala |  5 ++-
 .../test/scala/unit/kafka/api/ApiVersionTest.scala |  3 +-
 .../controller/ControllerChannelManagerTest.scala  | 12 +++--
 21 files changed, 180 insertions(+), 59 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 83e9444..7123845 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-
 public class ControlledShutdownRequest extends AbstractRequest {
 
     public static class Builder extends 
AbstractRequest.Builder<ControlledShutdownRequest> {
@@ -63,9 +62,10 @@ public class ControlledShutdownRequest extends 
AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        return new ControlledShutdownResponse(new 
ControlledShutdownResponseData().
-            setErrorCode(Errors.forException(e).code()));
+    public ControlledShutdownResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+        ControlledShutdownResponseData data = new 
ControlledShutdownResponseData()
+                .setErrorCode(Errors.forException(e).code());
+        return new ControlledShutdownResponse(data);
     }
 
     public static ControlledShutdownRequest parse(ByteBuffer buffer, short 
version) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 63013ff..270069a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -146,18 +146,7 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
                 .setErrorCode(error.code()));
         }
         responseData.setPartitionErrors(partitions);
-
-        short versionId = version();
-        switch (versionId) {
-            case 0:
-            case 1:
-            case 2:
-            case 3:
-                return new LeaderAndIsrResponse(responseData);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), 
ApiKeys.LEADER_AND_ISR.latestVersion()));
-        }
+        return new LeaderAndIsrResponse(responseData);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 425f66a..0f60e20 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -116,16 +116,7 @@ public class StopReplicaRequest extends 
AbstractControlRequest {
                 .setErrorCode(error.code()));
         }
         data.setPartitionErrors(partitions);
-
-        short versionId = version();
-        switch (versionId) {
-            case 0:
-            case 1:
-                return new StopReplicaResponse(data);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), 
ApiKeys.STOP_REPLICA.latestVersion()));
-        }
+        return new StopReplicaResponse(data);
     }
 
     public boolean deletePartitions() {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 90dbff3..49f962d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -187,12 +187,9 @@ public class UpdateMetadataRequest extends 
AbstractControlRequest {
 
     @Override
     public UpdateMetadataResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-        short version = version();
-        if (version <= 5)
-            return new UpdateMetadataResponse(new 
UpdateMetadataResponseData().setErrorCode(Errors.forException(e).code()));
-        else
-            throw new IllegalArgumentException(String.format("Version %d is 
not valid. Valid versions for %s are 0 to %d",
-                version, this.getClass().getSimpleName(), 
ApiKeys.UPDATE_METADATA.latestVersion()));
+        UpdateMetadataResponseData data = new UpdateMetadataResponseData()
+                .setErrorCode(Errors.forException(e).code());
+        return new UpdateMetadataResponse(data);
     }
 
     public Iterable<UpdateMetadataPartitionState> partitionStates() {
diff --git 
a/clients/src/main/resources/common/message/ControlledShutdownRequest.json 
b/clients/src/main/resources/common/message/ControlledShutdownRequest.json
index 6592e78..5756d1c 100644
--- a/clients/src/main/resources/common/message/ControlledShutdownRequest.json
+++ b/clients/src/main/resources/common/message/ControlledShutdownRequest.json
@@ -24,8 +24,8 @@
   // Version 1 is the same as version 0.
   //
   // Version 2 adds BrokerEpoch.
-  "validVersions": "0-2",
-  "flexibleVersions": "none",
+  "validVersions": "0-3",
+  "flexibleVersions": "3+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The id of the broker for which controlled shutdown has been 
requested." },
diff --git 
a/clients/src/main/resources/common/message/ControlledShutdownResponse.json 
b/clients/src/main/resources/common/message/ControlledShutdownResponse.json
index dc61d70..27feb1b 100644
--- a/clients/src/main/resources/common/message/ControlledShutdownResponse.json
+++ b/clients/src/main/resources/common/message/ControlledShutdownResponse.json
@@ -18,8 +18,8 @@
   "type": "response",
   "name": "ControlledShutdownResponse",
   // Versions 1 and 2 are the same as version 0.
-  "validVersions": "0-2",
-  "flexibleVersions": "none",
+  "validVersions": "0-3",
+  "flexibleVersions": "3+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The top-level error code." },
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json 
b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index 8af1df5..8529688 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -22,8 +22,8 @@
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
   //
   // Version 3 adds AddingReplicas and RemovingReplicas
-  "validVersions": "0-3",
-  "flexibleVersions": "none",
+  "validVersions": "0-4",
+  "flexibleVersions": "4+",
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The current controller ID." },
diff --git 
a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json 
b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
index 3b67c4c..10c3cd9 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
@@ -22,8 +22,8 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-3",
-  "flexibleVersions": "none",
+  "validVersions": "0-4",
+  "flexibleVersions": "4+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
diff --git a/clients/src/main/resources/common/message/StopReplicaRequest.json 
b/clients/src/main/resources/common/message/StopReplicaRequest.json
index 695c0c1..5c13705 100644
--- a/clients/src/main/resources/common/message/StopReplicaRequest.json
+++ b/clients/src/main/resources/common/message/StopReplicaRequest.json
@@ -19,8 +19,8 @@
   "name": "StopReplicaRequest",
   // Version 1 adds the broker epoch and reorganizes the partitions to be 
stored
   // per topic.
-  "validVersions": "0-1",
-  "flexibleVersions": "none",
+  "validVersions": "0-2",
+  "flexibleVersions": "2+",
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The controller id." },
diff --git a/clients/src/main/resources/common/message/StopReplicaResponse.json 
b/clients/src/main/resources/common/message/StopReplicaResponse.json
index d55a7b5..d864e91 100644
--- a/clients/src/main/resources/common/message/StopReplicaResponse.json
+++ b/clients/src/main/resources/common/message/StopReplicaResponse.json
@@ -18,8 +18,8 @@
   "type": "response",
   "name": "StopReplicaResponse",
   // Version 1 is the same as version 0.
-  "validVersions": "0-1",
-  "flexibleVersions": "none",
+  "validVersions": "0-2",
+  "flexibleVersions": "2+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The top-level error code, or 0 if there was no top-level 
error." },
diff --git 
a/clients/src/main/resources/common/message/UpdateMetadataRequest.json 
b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
index 203ee44..3c45f83 100644
--- a/clients/src/main/resources/common/message/UpdateMetadataRequest.json
+++ b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
@@ -26,8 +26,8 @@
   // Version 4 adds the offline replica list.
   //
   // Version 5 adds the broker epoch field and normalizes partitions by topic.
-  "validVersions": "0-5",
-  "flexibleVersions": "none",
+  "validVersions": "0-6",
+  "flexibleVersions": "6+",
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The controller id." },
diff --git 
a/clients/src/main/resources/common/message/UpdateMetadataResponse.json 
b/clients/src/main/resources/common/message/UpdateMetadataResponse.json
index 5990beb..aaebed0 100644
--- a/clients/src/main/resources/common/message/UpdateMetadataResponse.json
+++ b/clients/src/main/resources/common/message/UpdateMetadataResponse.json
@@ -18,8 +18,8 @@
   "type": "response",
   "name": "UpdateMetadataResponse",
   // Versions 1, 2, 3, 4, and 5 are the same as version 0
-  "validVersions": "0-5",
-  "flexibleVersions": "none",
+  "validVersions": "0-6",
+  "flexibleVersions": "6+",
   "fields": [
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The error code, or 0 if there was no error." }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ControlledShutdownRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/ControlledShutdownRequestTest.java
new file mode 100644
index 0000000..b7ec3da
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/ControlledShutdownRequestTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ControlledShutdownRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import static org.apache.kafka.common.protocol.ApiKeys.CONTROLLED_SHUTDOWN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class ControlledShutdownRequestTest {
+
+    @Test
+    public void testUnsupportedVersion() {
+        ControlledShutdownRequest.Builder builder = new 
ControlledShutdownRequest.Builder(
+                new ControlledShutdownRequestData().setBrokerId(1),
+                (short) (CONTROLLED_SHUTDOWN.latestVersion() + 1));
+        assertThrows(UnsupportedVersionException.class, builder::build);
+    }
+
+    @Test
+    public void testGetErrorResponse() {
+        for (short version = CONTROLLED_SHUTDOWN.oldestVersion(); version < 
CONTROLLED_SHUTDOWN.latestVersion(); version++) {
+            ControlledShutdownRequest.Builder builder = new 
ControlledShutdownRequest.Builder(
+                    new ControlledShutdownRequestData().setBrokerId(1), 
version);
+            ControlledShutdownRequest request = builder.build();
+            ControlledShutdownResponse response = request.getErrorResponse(0,
+                    new ClusterAuthorizationException("Not authorized"));
+            assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, 
response.error());
+        }
+    }
+
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
index 6ad6197..2235a8f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
@@ -18,10 +18,13 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.LeaderAndIsrRequestData;
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrLiveLeader;
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.MessageTestUtil;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -39,10 +42,31 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class LeaderAndIsrRequestTest {
 
+    @Test
+    public void testUnsupportedVersion() {
+        LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(
+                (short) (LEADER_AND_ISR.latestVersion() + 1), 0, 0, 0,
+                Collections.emptyList(), Collections.emptySet());
+        assertThrows(UnsupportedVersionException.class, builder::build);
+    }
+
+    @Test
+    public void testGetErrorResponse() {
+        for (short version = LEADER_AND_ISR.oldestVersion(); version < 
LEADER_AND_ISR.latestVersion(); version++) {
+            LeaderAndIsrRequest.Builder builder = new 
LeaderAndIsrRequest.Builder(version, 0, 0, 0,
+                    Collections.emptyList(), Collections.emptySet());
+            LeaderAndIsrRequest request = builder.build();
+            LeaderAndIsrResponse response = request.getErrorResponse(0,
+                    new ClusterAuthorizationException("Not authorized"));
+            assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, 
response.error());
+        }
+    }
+
     /**
      * Verifies the logic we have in LeaderAndIsrRequest to present a unified 
interface across the various versions
      * works correctly. For example, `LeaderAndIsrPartitionState.topicName` is 
not serialiazed/deserialized in
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java
index a143ff3..5bb4998 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java
@@ -17,17 +17,44 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.MessageTestUtil;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.Set;
 
+import static org.apache.kafka.common.protocol.ApiKeys.STOP_REPLICA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class StopReplicaRequestTest {
 
     @Test
+    public void testUnsupportedVersion() {
+        StopReplicaRequest.Builder builder = new StopReplicaRequest.Builder(
+                (short) (STOP_REPLICA.latestVersion() + 1),
+                0, 0, 0L, false, Collections.emptyList());
+        assertThrows(UnsupportedVersionException.class, builder::build);
+    }
+
+    @Test
+    public void testGetErrorResponse() {
+        for (short version = STOP_REPLICA.oldestVersion(); version < 
STOP_REPLICA.latestVersion(); version++) {
+            StopReplicaRequest.Builder builder = new 
StopReplicaRequest.Builder(version,
+                    0, 0, 0L, false, Collections.emptyList());
+            StopReplicaRequest request = builder.build();
+            StopReplicaResponse response = request.getErrorResponse(0,
+                    new ClusterAuthorizationException("Not authorized"));
+            assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, 
response.error());
+        }
+    }
+
+    @Test
     public void testStopReplicaRequestNormalization() {
         Set<TopicPartition> tps = TestUtils.generateRandomTopicPartitions(10, 
10);
         StopReplicaRequest.Builder builder = new 
StopReplicaRequest.Builder((short) 5, 0, 0, 0, false, tps);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java
index a5d0d99..fe688ce 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java
@@ -17,12 +17,15 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.UpdateMetadataRequestData;
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint;
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.MessageTestUtil;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.test.TestUtils;
@@ -41,10 +44,31 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 import static org.apache.kafka.common.protocol.ApiKeys.UPDATE_METADATA;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class UpdateMetadataRequestTest {
 
+    @Test
+    public void testUnsupportedVersion() {
+        UpdateMetadataRequest.Builder builder = new 
UpdateMetadataRequest.Builder(
+                (short) (UPDATE_METADATA.latestVersion() + 1), 0, 0, 0,
+                Collections.emptyList(), Collections.emptyList());
+        assertThrows(UnsupportedVersionException.class, builder::build);
+    }
+
+    @Test
+    public void testGetErrorResponse() {
+        for (short version = UPDATE_METADATA.oldestVersion(); version < 
UPDATE_METADATA.latestVersion(); version++) {
+            UpdateMetadataRequest.Builder builder = new 
UpdateMetadataRequest.Builder(
+                    version, 0, 0, 0, Collections.emptyList(), 
Collections.emptyList());
+            UpdateMetadataRequest request = builder.build();
+            UpdateMetadataResponse response = request.getErrorResponse(0,
+                    new ClusterAuthorizationException("Not authorized"));
+            assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, 
response.error());
+        }
+    }
+
     /**
      * Verifies the logic we have in UpdateMetadataRequest to present a 
unified interface across the various versions
      * works correctly. For example, `UpdateMetadataPartitionState.topicName` 
is not serialiazed/deserialized in
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index 1a33625..90ffb10 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -92,7 +92,9 @@ object ApiVersion {
     // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
     KAFKA_2_3_IV1,
     // Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
-    KAFKA_2_4_IV0
+    KAFKA_2_4_IV0,
+    // Flexible version support in inter-broker APIs
+    KAFKA_2_4_IV1
   )
 
   // Map keys are the union of the short and full versions
@@ -325,6 +327,13 @@ case object KAFKA_2_4_IV0 extends DefaultApiVersion {
   val id: Int = 24
 }
 
+case object KAFKA_2_4_IV1 extends DefaultApiVersion {
+  val shortVersion: String = "2.4"
+  val subVersion = "IV1"
+  val recordVersion = RecordVersion.V2
+  val id: Int = 25
+}
+
 object ApiVersionValidator extends Validator {
 
   override def ensureValid(name: String, value: Any): Unit = {
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 6c7450a..c60130b 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -447,7 +447,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
 
   private def sendLeaderAndIsrRequest(controllerEpoch: Int, stateChangeLog: 
StateChangeLogger): Unit = {
     val leaderAndIsrRequestVersion: Short =
-      if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV0) 3
+      if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4
+      else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV0) 3
       else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
       else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
       else 0
@@ -483,7 +484,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
 
     val partitionStates = updateMetadataRequestPartitionInfoMap.values.toBuffer
     val updateMetadataRequestVersion: Short =
-      if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 5
+      if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 6
+      else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 5
       else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 4
       else if (config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
       else if (config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
@@ -528,7 +530,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
 
   private def sendStopReplicaRequests(controllerEpoch: Int): Unit = {
     val stopReplicaRequestVersion: Short =
-      if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1
+      if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 2
+      else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1
       else 0
 
     def stopReplicaPartitionDeleteResponseCallback(brokerId: Int)(response: 
AbstractResponse): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 57cb0b6..de0ee22 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -24,7 +24,7 @@ import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
 import com.yammer.metrics.core.Gauge
-import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0}
+import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1}
 import kafka.cluster.Broker
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, 
InconsistentBrokerMetadataException, InconsistentClusterIdException}
 import kafka.controller.KafkaController
@@ -528,7 +528,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
               val controlledShutdownApiVersion: Short =
                 if (config.interBrokerProtocolVersion < KAFKA_0_9_0) 0
                 else if (config.interBrokerProtocolVersion < KAFKA_2_2_IV0) 1
-                else 2
+                else if (config.interBrokerProtocolVersion < KAFKA_2_4_IV1) 2
+                else 3
 
               val controlledShutdownRequest = new 
ControlledShutdownRequest.Builder(
                   new ControlledShutdownRequestData()
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala 
b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index dadef1d..0519ea0 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -93,8 +93,9 @@ class ApiVersionTest {
     assertEquals(KAFKA_2_3_IV0, ApiVersion("2.3-IV0"))
     assertEquals(KAFKA_2_3_IV1, ApiVersion("2.3-IV1"))
 
-    assertEquals(KAFKA_2_4_IV0, ApiVersion("2.4"))
+    assertEquals(KAFKA_2_4_IV1, ApiVersion("2.4"))
     assertEquals(KAFKA_2_4_IV0, ApiVersion("2.4-IV0"))
+    assertEquals(KAFKA_2_4_IV1, ApiVersion("2.4-IV1"))
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
index e603b25..88607f2 100644
--- 
a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -18,7 +18,7 @@ package kafka.controller
 
 import java.util.Properties
 
-import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_0_10_2_IV0, KAFKA_0_9_0, 
KAFKA_1_0_IV0, KAFKA_2_2_IV0, KAFKA_2_4_IV0, LeaderAndIsr}
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_0_10_2_IV0, KAFKA_0_9_0, 
KAFKA_1_0_IV0, KAFKA_2_2_IV0, KAFKA_2_4_IV0, KAFKA_2_4_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
@@ -157,7 +157,8 @@ class ControllerChannelManagerTest {
 
     for (apiVersion <- ApiVersion.allVersions) {
       val leaderAndIsrRequestVersion: Short =
-        if (apiVersion >= KAFKA_2_4_IV0) 3
+        if (apiVersion >= KAFKA_2_4_IV1) 4
+        else if (apiVersion >= KAFKA_2_4_IV0) 3
         else if (apiVersion >= KAFKA_2_2_IV0) 2
         else if (apiVersion >= KAFKA_1_0_IV0) 1
         else 0
@@ -326,7 +327,8 @@ class ControllerChannelManagerTest {
 
     for (apiVersion <- ApiVersion.allVersions) {
       val updateMetadataRequestVersion: Short =
-        if (apiVersion >= KAFKA_2_2_IV0) 5
+        if (apiVersion >= KAFKA_2_4_IV1) 6
+        else if (apiVersion >= KAFKA_2_2_IV0) 5
         else if (apiVersion >= KAFKA_1_0_IV0) 4
         else if (apiVersion >= KAFKA_0_10_2_IV0) 3
         else if (apiVersion >= KAFKA_0_10_0_IV1) 2
@@ -597,8 +599,10 @@ class ControllerChannelManagerTest {
     for (apiVersion <- ApiVersion.allVersions) {
       if (apiVersion < KAFKA_2_2_IV0)
         testStopReplicaFollowsInterBrokerProtocolVersion(apiVersion, 0.toShort)
-      else
+      else if (apiVersion < KAFKA_2_4_IV1)
         testStopReplicaFollowsInterBrokerProtocolVersion(apiVersion, 1.toShort)
+      else
+        testStopReplicaFollowsInterBrokerProtocolVersion(apiVersion, 2.toShort)
     }
   }
 

Reply via email to