This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 1639e13  KAFKA-13053; Bump kraft frame version for incompatible 
changes from 2.8 (#11010)
1639e13 is described below

commit 1639e13cde6f61e13ebd2e00701dfb3bd037dc77
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Jul 9 12:18:34 2021 -0700

    KAFKA-13053; Bump kraft frame version for incompatible changes from 2.8 
(#11010)
    
    This patch bumps the default frame version for kraft records from 0 to 1. 
At the same time, we reset all
    records versions back to 0 and we enable flexible version support for 
UnregisterBrokerRecord, which was
    missed previously. Note that the frame version bump also affects the 
KIP-405 records since they are
    sharing AbstractApiMessageSerde. Since these records were not part of any 
previous releases, this should
    not cause an issue.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../common/metadata/ClientQuotaRecord.json         |  4 ++--
 .../resources/common/metadata/ConfigRecord.json    |  6 ++---
 .../common/metadata/FenceBrokerRecord.json         |  4 ++--
 .../resources/common/metadata/PartitionRecord.json |  4 ++--
 .../common/metadata/RegisterBrokerRecord.json      |  6 ++---
 .../common/metadata/RemoveTopicRecord.json         |  4 ++--
 .../resources/common/metadata/TopicRecord.json     |  4 ++--
 .../common/metadata/UnfenceBrokerRecord.json       |  4 ++--
 .../common/metadata/UnregisterBrokerRecord.json    |  2 +-
 .../controller/ClientQuotaControlManagerTest.java  | 22 +++++++++---------
 .../controller/ClusterControlManagerTest.java      |  6 ++---
 .../ConfigurationControlManagerTest.java           | 12 +++++-----
 .../kafka/controller/QuorumControllerTest.java     | 26 +++++++++-------------
 .../controller/ReplicationControlManagerTest.java  |  4 ++--
 .../kafka/metadata/MetadataRecordSerdeTest.java    | 10 ++++-----
 .../serialization/AbstractApiMessageSerde.java     |  7 ++++--
 16 files changed, 62 insertions(+), 63 deletions(-)

diff --git a/metadata/src/main/resources/common/metadata/ClientQuotaRecord.json 
b/metadata/src/main/resources/common/metadata/ClientQuotaRecord.json
index 737f6e0..9bb7aca 100644
--- a/metadata/src/main/resources/common/metadata/ClientQuotaRecord.json
+++ b/metadata/src/main/resources/common/metadata/ClientQuotaRecord.json
@@ -17,8 +17,8 @@
   "apiKey": 14,
   "type": "metadata",
   "name": "ClientQuotaRecord",
-  "validVersions": "0-1",
-  "flexibleVersions": "1+",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Entity", "type": "[]EntityData", "versions": "0+",
       "about": "The quota entity to alter.", "fields": [
diff --git a/metadata/src/main/resources/common/metadata/ConfigRecord.json 
b/metadata/src/main/resources/common/metadata/ConfigRecord.json
index 3c075e8..a0f0c3a 100644
--- a/metadata/src/main/resources/common/metadata/ConfigRecord.json
+++ b/metadata/src/main/resources/common/metadata/ConfigRecord.json
@@ -17,8 +17,8 @@
   "apiKey": 4,
   "type": "metadata",
   "name": "ConfigRecord",
-  "validVersions": "0-1",
-  "flexibleVersions": "1+",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "ResourceType", "type": "int8", "versions": "0+",
       "about": "The type of resource this configuration applies to." },
@@ -26,7 +26,7 @@
       "about": "The name of the resource this configuration applies to." },
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The name of the configuration key." },
-    { "name": "Value", "type": "string", "versions": "0+", "nullableVersions": 
"1+",
+    { "name": "Value", "type": "string", "versions": "0+", "nullableVersions": 
"0+",
       "about": "The value of the configuration, or null if the it should be 
deleted." }
   ]
 }
diff --git a/metadata/src/main/resources/common/metadata/FenceBrokerRecord.json 
b/metadata/src/main/resources/common/metadata/FenceBrokerRecord.json
index 15935f8..0cd29be 100644
--- a/metadata/src/main/resources/common/metadata/FenceBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/FenceBrokerRecord.json
@@ -17,8 +17,8 @@
   "apiKey": 7,
   "type": "metadata",
   "name": "FenceBrokerRecord",
-  "validVersions": "0-1",
-  "flexibleVersions": "1+",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Id", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The broker ID to fence. It will be removed from all ISRs." },
diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json 
b/metadata/src/main/resources/common/metadata/PartitionRecord.json
index 233f852..66a13e2 100644
--- a/metadata/src/main/resources/common/metadata/PartitionRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json
@@ -17,8 +17,8 @@
   "apiKey": 3,
   "type": "metadata",
   "name": "PartitionRecord",
-  "validVersions": "0-1",
-  "flexibleVersions": "1+",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "PartitionId", "type": "int32", "versions": "0+", "default": 
"-1",
       "about": "The partition id." },
diff --git 
a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json 
b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
index 89ddf2c..a0e7af2 100644
--- a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
@@ -17,8 +17,8 @@
   "apiKey": 0,
   "type": "metadata",
   "name": "RegisterBrokerRecord",
-  "validVersions": "0-1",
-  "flexibleVersions": "1+",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The broker id." },
@@ -48,7 +48,7 @@
     ]},
     { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": 
"0+",
       "about": "The broker rack." },
-    { "name": "Fenced", "type": "bool", "versions": "1+", "default": "true",
+    { "name": "Fenced", "type": "bool", "versions": "0+", "default": "true",
       "about": "True if the broker is fenced." }
   ]
 }
diff --git a/metadata/src/main/resources/common/metadata/RemoveTopicRecord.json 
b/metadata/src/main/resources/common/metadata/RemoveTopicRecord.json
index f6dc948..be290e3 100644
--- a/metadata/src/main/resources/common/metadata/RemoveTopicRecord.json
+++ b/metadata/src/main/resources/common/metadata/RemoveTopicRecord.json
@@ -17,8 +17,8 @@
   "apiKey": 9,
   "type": "metadata",
   "name": "RemoveTopicRecord",
-  "validVersions": "0-1",
-  "flexibleVersions": "1+",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "TopicId", "type": "uuid", "versions": "0+",
       "about": "The topic to remove. All associated partitions will be removed 
as well." }
diff --git a/metadata/src/main/resources/common/metadata/TopicRecord.json 
b/metadata/src/main/resources/common/metadata/TopicRecord.json
index 8b4f8d9..6fa5a05 100644
--- a/metadata/src/main/resources/common/metadata/TopicRecord.json
+++ b/metadata/src/main/resources/common/metadata/TopicRecord.json
@@ -17,8 +17,8 @@
   "apiKey": 2,
   "type": "metadata",
   "name": "TopicRecord",
-  "validVersions": "0-1",
-  "flexibleVersions": "1+",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
       "about": "The topic name." },
diff --git 
a/metadata/src/main/resources/common/metadata/UnfenceBrokerRecord.json 
b/metadata/src/main/resources/common/metadata/UnfenceBrokerRecord.json
index 8a6f944..92770a6 100644
--- a/metadata/src/main/resources/common/metadata/UnfenceBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/UnfenceBrokerRecord.json
@@ -17,8 +17,8 @@
   "apiKey": 8,
   "type": "metadata",
   "name": "UnfenceBrokerRecord",
-  "validVersions": "0-1",
-  "flexibleVersions": "1+",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "Id", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The broker ID to unfence." },
diff --git 
a/metadata/src/main/resources/common/metadata/UnregisterBrokerRecord.json 
b/metadata/src/main/resources/common/metadata/UnregisterBrokerRecord.json
index 58569bf..358d88a 100644
--- a/metadata/src/main/resources/common/metadata/UnregisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/UnregisterBrokerRecord.json
@@ -18,7 +18,7 @@
   "type": "metadata",
   "name": "UnregisterBrokerRecord",
   "validVersions": "0",
-  "flexibleVersions": "none",
+  "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The broker id." },
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
index e7039d2..b915db3 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
@@ -211,42 +211,42 @@ public class ClientQuotaControlManagerTest {
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-1"),
                 new 
EntityData().setEntityType("client-id").setEntityName("client-id-1"))).
-                    
setKey("request_percentage").setValue(50.5).setRemove(false), (short) 1)),
+                    
setKey("request_percentage").setValue(50.5).setRemove(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-2"),
                 new 
EntityData().setEntityType("client-id").setEntityName("client-id-1"))).
-                    
setKey("request_percentage").setValue(51.51).setRemove(false), (short) 1)),
+                    
setKey("request_percentage").setValue(51.51).setRemove(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-3"),
                 new 
EntityData().setEntityType("client-id").setEntityName("client-id-2"))).
-                    
setKey("request_percentage").setValue(52.52).setRemove(false), (short) 1)),
+                    
setKey("request_percentage").setValue(52.52).setRemove(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName(null),
                 new 
EntityData().setEntityType("client-id").setEntityName("client-id-1"))).
-                    
setKey("request_percentage").setValue(53.53).setRemove(false), (short) 1)),
+                    
setKey("request_percentage").setValue(53.53).setRemove(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-1"),
                 new 
EntityData().setEntityType("client-id").setEntityName(null))).
-                    
setKey("request_percentage").setValue(54.54).setRemove(false), (short) 1)),
+                    
setKey("request_percentage").setValue(54.54).setRemove(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-3"),
                 new 
EntityData().setEntityType("client-id").setEntityName(null))).
-                    
setKey("request_percentage").setValue(55.55).setRemove(false), (short) 1)),
+                    
setKey("request_percentage").setValue(55.55).setRemove(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new 
EntityData().setEntityType("user").setEntityName("user-1"))).
-                    
setKey("request_percentage").setValue(56.56).setRemove(false), (short) 1)),
+                    
setKey("request_percentage").setValue(56.56).setRemove(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new 
EntityData().setEntityType("user").setEntityName("user-2"))).
-                    
setKey("request_percentage").setValue(57.57).setRemove(false), (short) 1)),
+                    
setKey("request_percentage").setValue(57.57).setRemove(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new 
EntityData().setEntityType("user").setEntityName("user-3"))).
-                    
setKey("request_percentage").setValue(58.58).setRemove(false), (short) 1)),
+                    
setKey("request_percentage").setValue(58.58).setRemove(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName(null))).
-                    
setKey("request_percentage").setValue(59.59).setRemove(false), (short) 1)),
+                    
setKey("request_percentage").setValue(59.59).setRemove(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new 
ClientQuotaRecord().setEntity(Arrays.asList(
                 new 
EntityData().setEntityType("client-id").setEntityName("client-id-2"))).
-                    
setKey("request_percentage").setValue(60.60).setRemove(false), (short) 1))),
+                    
setKey("request_percentage").setValue(60.60).setRemove(false), (short) 0))),
             manager.iterator(Long.MAX_VALUE));
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 1fae0aa..195b02a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -185,7 +185,7 @@ public class ClusterControlManagerTest {
                         setPort((short) 9092).
                         setName("PLAINTEXT").
                         setHost("example.com")).iterator())).
-                setFenced(false), (short) 1)),
+                setFenced(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerEpoch(100).setBrokerId(1).setRack(null).
                 setEndPoints(new 
BrokerEndpointCollection(Collections.singleton(
@@ -193,7 +193,7 @@ public class ClusterControlManagerTest {
                         setPort((short) 9093).
                         setName("PLAINTEXT").
                         setHost("example.com")).iterator())).
-                setFenced(false), (short) 1)),
+                setFenced(false), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerEpoch(100).setBrokerId(2).setRack(null).
                 setEndPoints(new 
BrokerEndpointCollection(Collections.singleton(
@@ -201,7 +201,7 @@ public class ClusterControlManagerTest {
                         setPort((short) 9094).
                         setName("PLAINTEXT").
                         setHost("example.com")).iterator())).
-                setFenced(true), (short) 1))),
+                setFenced(true), (short) 0))),
                 clusterControl.iterator(Long.MAX_VALUE));
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index ec15e66..9cba393 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -107,10 +107,10 @@ public class ConfigurationControlManagerTest {
         RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
             Arrays.asList(new ApiMessageAndVersion(new ConfigRecord().
                     setResourceType(TOPIC.id()).setResourceName("mytopic").
-                    setName("abc").setValue("x,y,z"), (short) 1),
+                    setName("abc").setValue("x,y,z"), (short) 0),
                 new ApiMessageAndVersion(new ConfigRecord().
                     setResourceType(TOPIC.id()).setResourceName("mytopic").
-                    setName("def").setValue("blah"), (short) 1))),
+                    setName("def").setValue("blah"), (short) 0))),
             manager.iterator(Long.MAX_VALUE));
     }
 
@@ -146,7 +146,7 @@ public class ConfigurationControlManagerTest {
             new ConfigurationControlManager(new LogContext(), 
snapshotRegistry, CONFIGS);
         assertEquals(ControllerResult.atomicOf(Collections.singletonList(new 
ApiMessageAndVersion(
                 new 
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
-                    setName("abc").setValue("123"), (short) 1)),
+                    setName("abc").setValue("123"), (short) 0)),
                 toMap(entry(BROKER0, new ApiError(Errors.INVALID_REQUEST,
                             "A DELETE op was given with a non-null value.")),
                     entry(MYTOPIC, ApiError.NONE))),
@@ -187,10 +187,10 @@ public class ConfigurationControlManagerTest {
         List<ApiMessageAndVersion> expectedRecords1 = Arrays.asList(
             new ApiMessageAndVersion(new ConfigRecord().
                 setResourceType(TOPIC.id()).setResourceName("mytopic").
-                setName("abc").setValue("456"), (short) 1),
+                setName("abc").setValue("456"), (short) 0),
             new ApiMessageAndVersion(new ConfigRecord().
                 setResourceType(TOPIC.id()).setResourceName("mytopic").
-                setName("def").setValue("901"), (short) 1));
+                setName("def").setValue("901"), (short) 0));
         assertEquals(
             ControllerResult.atomicOf(
                 expectedRecords1,
@@ -212,7 +212,7 @@ public class ConfigurationControlManagerTest {
                             .setResourceName("mytopic")
                             .setName("abc")
                             .setValue(null),
-                        (short) 1
+                        (short) 0
                     )
                 ),
                 toMap(entry(MYTOPIC, ApiError.NONE))
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 45c969d..38a368d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -37,20 +37,20 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
-import 
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import 
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
 import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
 import 
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
-import org.apache.kafka.common.message.BrokerRegistrationRequestData;
 import 
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
 import 
org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignmentCollection;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.ElectLeadersRequestData;
 import org.apache.kafka.common.message.ElectLeadersResponseData;
@@ -58,9 +58,9 @@ import 
org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
 import 
org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
@@ -77,8 +77,6 @@ import org.apache.kafka.snapshot.RawSnapshotReader;
 import org.apache.kafka.snapshot.SnapshotReader;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static java.util.concurrent.TimeUnit.HOURS;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@@ -93,8 +91,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(value = 40)
 public class QuorumControllerTest {
-    private static final Logger log =
-        LoggerFactory.getLogger(QuorumControllerTest.class);
 
     /**
      * Test creating a new QuorumController and closing it.
@@ -424,17 +420,17 @@ public class QuorumControllerTest {
     private List<ApiMessageAndVersion> expectedSnapshotContent(Uuid fooId, 
Map<Integer, Long> brokerEpochs) {
         return Arrays.asList(
             new ApiMessageAndVersion(new TopicRecord().
-                setName("foo").setTopicId(fooId), (short) 1),
+                setName("foo").setTopicId(fooId), (short) 0),
             new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
                 setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).
                 setIsr(Arrays.asList(0, 1, 
2)).setRemovingReplicas(Collections.emptyList()).
                 
setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).
-                setPartitionEpoch(0), (short) 1),
+                setPartitionEpoch(0), (short) 0),
             new ApiMessageAndVersion(new PartitionRecord().setPartitionId(1).
                 setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).
                 setIsr(Arrays.asList(1, 2, 
0)).setRemovingReplicas(Collections.emptyList()).
                 
setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
-                setPartitionEpoch(0), (short) 1),
+                setPartitionEpoch(0), (short) 0),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB0")).
@@ -444,7 +440,7 @@ public class QuorumControllerTest {
                             new 
BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                             setPort(9092).setSecurityProtocol((short) 
0)).iterator())).
                 setRack(null).
-                setFenced(false), (short) 1),
+                setFenced(false), (short) 0),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB1")).
@@ -454,7 +450,7 @@ public class QuorumControllerTest {
                             new 
BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                             setPort(9093).setSecurityProtocol((short) 
0)).iterator())).
                 setRack(null).
-                setFenced(false), (short) 1),
+                setFenced(false), (short) 0),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB2")).
@@ -464,14 +460,14 @@ public class QuorumControllerTest {
                             new 
BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                             setPort(9094).setSecurityProtocol((short) 
0)).iterator())).
                 setRack(null).
-                setFenced(false), (short) 1),
+                setFenced(false), (short) 0),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(3).setBrokerEpoch(brokerEpochs.get(3)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB3")).
                 setEndPoints(new BrokerEndpointCollection(Arrays.asList(
                     new 
BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                         setPort(9095).setSecurityProtocol((short) 
0)).iterator())).
-                setRack(null), (short) 1),
+                setRack(null), (short) 0),
             new ApiMessageAndVersion(new ProducerIdsRecord().
                 setBrokerId(0).
                 setBrokerEpoch(brokerEpochs.get(0)).
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 6298c95..0182477 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -201,9 +201,9 @@ public class ReplicationControlManagerTest {
                     setPartitionId(0).setTopicId(fooId).
                     setReplicas(Arrays.asList(1, 2, 
0)).setIsr(Arrays.asList(1, 2, 0)).
                     
setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).
-                    setLeaderEpoch(0).setPartitionEpoch(0), (short) 1),
+                    setLeaderEpoch(0).setPartitionEpoch(0), (short) 0),
                 new ApiMessageAndVersion(new TopicRecord().
-                    setTopicId(fooId).setName("foo"), (short) 1))),
+                    setTopicId(fooId).setName("foo"), (short) 0))),
             ctx.replicationControl.iterator(Long.MAX_VALUE));
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
index 3a25b8d..cbcbe85 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
@@ -101,7 +101,7 @@ class MetadataRecordSerdeTest {
         MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.clear();
-        buffer.put((byte) 0x00);
+        buffer.put((byte) 0x01);
         buffer.put((byte) 0x80);
         buffer.put((byte) 0x80);
         buffer.put((byte) 0x80);
@@ -123,7 +123,7 @@ class MetadataRecordSerdeTest {
         MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.clear();
-        buffer.put((byte) 0x00);
+        buffer.put((byte) 0x01);
         buffer.put((byte) 0x08);
         buffer.put((byte) 0x80);
         buffer.put((byte) 0x80);
@@ -145,7 +145,7 @@ class MetadataRecordSerdeTest {
         MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.clear();
-        buffer.put((byte) 0x00); // frame version
+        buffer.put((byte) 0x01); // frame version
         buffer.put((byte) 0x08); // apiKey
         buffer.put((byte) 0xff); // api version
         buffer.put((byte) 0xff); // api version
@@ -166,7 +166,7 @@ class MetadataRecordSerdeTest {
     public void testParsingUnsupportedApiKey() {
         MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(64);
-        buffer.put((byte) 0x00); // frame version
+        buffer.put((byte) 0x01); // frame version
         buffer.put((byte) 0xff); // apiKey
         buffer.put((byte) 0x7f); // apiKey
         buffer.put((byte) 0x00); // api version
@@ -185,7 +185,7 @@ class MetadataRecordSerdeTest {
     public void testParsingMalformedMessage() {
         MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(4);
-        buffer.put((byte) 0x00); // frame version
+        buffer.put((byte) 0x01); // frame version
         buffer.put((byte) 0x00); // apiKey
         buffer.put((byte) 0x00); // apiVersion
         buffer.put((byte) 0x80); // malformed data
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
index d292e39..7533178 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
@@ -42,7 +42,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
  * </pre>
  */
 public abstract class AbstractApiMessageSerde implements 
RecordSerde<ApiMessageAndVersion> {
-    private static final short DEFAULT_FRAME_VERSION = 0;
+    private static final short DEFAULT_FRAME_VERSION = 1;
     private static final int DEFAULT_FRAME_VERSION_SIZE = 
ByteUtils.sizeOfUnsignedVarint(DEFAULT_FRAME_VERSION);
 
     private static short unsignedIntToShort(Readable input, String entity) {
@@ -83,7 +83,10 @@ public abstract class AbstractApiMessageSerde implements 
RecordSerde<ApiMessageA
                                      int size) {
         short frameVersion = unsignedIntToShort(input, "frame version");
 
-        if (frameVersion != DEFAULT_FRAME_VERSION) {
+        if (frameVersion == 0) {
+            throw new MetadataParseException("Could not deserialize metadata 
record with frame version 0. " +
+                "Note that upgrades from the preview release of KRaft in 2.8 
to newer versions are not supported.");
+        } else if (frameVersion != DEFAULT_FRAME_VERSION) {
             throw new MetadataParseException("Could not deserialize metadata 
record due to unknown frame version "
                     + frameVersion + "(only frame version " + 
DEFAULT_FRAME_VERSION + " is supported)");
         }

Reply via email to