This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 1639e13 KAFKA-13053; Bump kraft frame version for incompatible
changes from 2.8 (#11010)
1639e13 is described below
commit 1639e13cde6f61e13ebd2e00701dfb3bd037dc77
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Jul 9 12:18:34 2021 -0700
KAFKA-13053; Bump kraft frame version for incompatible changes from 2.8
(#11010)
This patch bumps the default frame version for kraft records from 0 to 1.
At the same time, we reset all
records versions back to 0 and we enable flexible version support for
UnregisterBrokerRecord, which was
missed previously. Note that the frame version bump also affects the
KIP-405 records since they are
sharing AbstractApiMessageSerde. Since these records were not part of any
previous releases, this should
not cause an issue.
Reviewers: Colin P. McCabe <[email protected]>
---
.../common/metadata/ClientQuotaRecord.json | 4 ++--
.../resources/common/metadata/ConfigRecord.json | 6 ++---
.../common/metadata/FenceBrokerRecord.json | 4 ++--
.../resources/common/metadata/PartitionRecord.json | 4 ++--
.../common/metadata/RegisterBrokerRecord.json | 6 ++---
.../common/metadata/RemoveTopicRecord.json | 4 ++--
.../resources/common/metadata/TopicRecord.json | 4 ++--
.../common/metadata/UnfenceBrokerRecord.json | 4 ++--
.../common/metadata/UnregisterBrokerRecord.json | 2 +-
.../controller/ClientQuotaControlManagerTest.java | 22 +++++++++---------
.../controller/ClusterControlManagerTest.java | 6 ++---
.../ConfigurationControlManagerTest.java | 12 +++++-----
.../kafka/controller/QuorumControllerTest.java | 26 +++++++++-------------
.../controller/ReplicationControlManagerTest.java | 4 ++--
.../kafka/metadata/MetadataRecordSerdeTest.java | 10 ++++-----
.../serialization/AbstractApiMessageSerde.java | 7 ++++--
16 files changed, 62 insertions(+), 63 deletions(-)
diff --git a/metadata/src/main/resources/common/metadata/ClientQuotaRecord.json
b/metadata/src/main/resources/common/metadata/ClientQuotaRecord.json
index 737f6e0..9bb7aca 100644
--- a/metadata/src/main/resources/common/metadata/ClientQuotaRecord.json
+++ b/metadata/src/main/resources/common/metadata/ClientQuotaRecord.json
@@ -17,8 +17,8 @@
"apiKey": 14,
"type": "metadata",
"name": "ClientQuotaRecord",
- "validVersions": "0-1",
- "flexibleVersions": "1+",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
"fields": [
{ "name": "Entity", "type": "[]EntityData", "versions": "0+",
"about": "The quota entity to alter.", "fields": [
diff --git a/metadata/src/main/resources/common/metadata/ConfigRecord.json
b/metadata/src/main/resources/common/metadata/ConfigRecord.json
index 3c075e8..a0f0c3a 100644
--- a/metadata/src/main/resources/common/metadata/ConfigRecord.json
+++ b/metadata/src/main/resources/common/metadata/ConfigRecord.json
@@ -17,8 +17,8 @@
"apiKey": 4,
"type": "metadata",
"name": "ConfigRecord",
- "validVersions": "0-1",
- "flexibleVersions": "1+",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
"fields": [
{ "name": "ResourceType", "type": "int8", "versions": "0+",
"about": "The type of resource this configuration applies to." },
@@ -26,7 +26,7 @@
"about": "The name of the resource this configuration applies to." },
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The name of the configuration key." },
- { "name": "Value", "type": "string", "versions": "0+", "nullableVersions":
"1+",
+ { "name": "Value", "type": "string", "versions": "0+", "nullableVersions":
"0+",
"about": "The value of the configuration, or null if the it should be
deleted." }
]
}
diff --git a/metadata/src/main/resources/common/metadata/FenceBrokerRecord.json
b/metadata/src/main/resources/common/metadata/FenceBrokerRecord.json
index 15935f8..0cd29be 100644
--- a/metadata/src/main/resources/common/metadata/FenceBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/FenceBrokerRecord.json
@@ -17,8 +17,8 @@
"apiKey": 7,
"type": "metadata",
"name": "FenceBrokerRecord",
- "validVersions": "0-1",
- "flexibleVersions": "1+",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
"fields": [
{ "name": "Id", "type": "int32", "versions": "0+", "entityType":
"brokerId",
"about": "The broker ID to fence. It will be removed from all ISRs." },
diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json
b/metadata/src/main/resources/common/metadata/PartitionRecord.json
index 233f852..66a13e2 100644
--- a/metadata/src/main/resources/common/metadata/PartitionRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json
@@ -17,8 +17,8 @@
"apiKey": 3,
"type": "metadata",
"name": "PartitionRecord",
- "validVersions": "0-1",
- "flexibleVersions": "1+",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default":
"-1",
"about": "The partition id." },
diff --git
a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
index 89ddf2c..a0e7af2 100644
--- a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
@@ -17,8 +17,8 @@
"apiKey": 0,
"type": "metadata",
"name": "RegisterBrokerRecord",
- "validVersions": "0-1",
- "flexibleVersions": "1+",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
"about": "The broker id." },
@@ -48,7 +48,7 @@
]},
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions":
"0+",
"about": "The broker rack." },
- { "name": "Fenced", "type": "bool", "versions": "1+", "default": "true",
+ { "name": "Fenced", "type": "bool", "versions": "0+", "default": "true",
"about": "True if the broker is fenced." }
]
}
diff --git a/metadata/src/main/resources/common/metadata/RemoveTopicRecord.json
b/metadata/src/main/resources/common/metadata/RemoveTopicRecord.json
index f6dc948..be290e3 100644
--- a/metadata/src/main/resources/common/metadata/RemoveTopicRecord.json
+++ b/metadata/src/main/resources/common/metadata/RemoveTopicRecord.json
@@ -17,8 +17,8 @@
"apiKey": 9,
"type": "metadata",
"name": "RemoveTopicRecord",
- "validVersions": "0-1",
- "flexibleVersions": "1+",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic to remove. All associated partitions will be removed
as well." }
diff --git a/metadata/src/main/resources/common/metadata/TopicRecord.json
b/metadata/src/main/resources/common/metadata/TopicRecord.json
index 8b4f8d9..6fa5a05 100644
--- a/metadata/src/main/resources/common/metadata/TopicRecord.json
+++ b/metadata/src/main/resources/common/metadata/TopicRecord.json
@@ -17,8 +17,8 @@
"apiKey": 2,
"type": "metadata",
"name": "TopicRecord",
- "validVersions": "0-1",
- "flexibleVersions": "1+",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
"fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType":
"topicName",
"about": "The topic name." },
diff --git
a/metadata/src/main/resources/common/metadata/UnfenceBrokerRecord.json
b/metadata/src/main/resources/common/metadata/UnfenceBrokerRecord.json
index 8a6f944..92770a6 100644
--- a/metadata/src/main/resources/common/metadata/UnfenceBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/UnfenceBrokerRecord.json
@@ -17,8 +17,8 @@
"apiKey": 8,
"type": "metadata",
"name": "UnfenceBrokerRecord",
- "validVersions": "0-1",
- "flexibleVersions": "1+",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
"fields": [
{ "name": "Id", "type": "int32", "versions": "0+", "entityType":
"brokerId",
"about": "The broker ID to unfence." },
diff --git
a/metadata/src/main/resources/common/metadata/UnregisterBrokerRecord.json
b/metadata/src/main/resources/common/metadata/UnregisterBrokerRecord.json
index 58569bf..358d88a 100644
--- a/metadata/src/main/resources/common/metadata/UnregisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/UnregisterBrokerRecord.json
@@ -18,7 +18,7 @@
"type": "metadata",
"name": "UnregisterBrokerRecord",
"validVersions": "0",
- "flexibleVersions": "none",
+ "flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
"about": "The broker id." },
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
index e7039d2..b915db3 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
@@ -211,42 +211,42 @@ public class ClientQuotaControlManagerTest {
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new EntityData().setEntityType("user").setEntityName("user-1"),
new
EntityData().setEntityType("client-id").setEntityName("client-id-1"))).
-
setKey("request_percentage").setValue(50.5).setRemove(false), (short) 1)),
+
setKey("request_percentage").setValue(50.5).setRemove(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new EntityData().setEntityType("user").setEntityName("user-2"),
new
EntityData().setEntityType("client-id").setEntityName("client-id-1"))).
-
setKey("request_percentage").setValue(51.51).setRemove(false), (short) 1)),
+
setKey("request_percentage").setValue(51.51).setRemove(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new EntityData().setEntityType("user").setEntityName("user-3"),
new
EntityData().setEntityType("client-id").setEntityName("client-id-2"))).
-
setKey("request_percentage").setValue(52.52).setRemove(false), (short) 1)),
+
setKey("request_percentage").setValue(52.52).setRemove(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new EntityData().setEntityType("user").setEntityName(null),
new
EntityData().setEntityType("client-id").setEntityName("client-id-1"))).
-
setKey("request_percentage").setValue(53.53).setRemove(false), (short) 1)),
+
setKey("request_percentage").setValue(53.53).setRemove(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new EntityData().setEntityType("user").setEntityName("user-1"),
new
EntityData().setEntityType("client-id").setEntityName(null))).
-
setKey("request_percentage").setValue(54.54).setRemove(false), (short) 1)),
+
setKey("request_percentage").setValue(54.54).setRemove(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new EntityData().setEntityType("user").setEntityName("user-3"),
new
EntityData().setEntityType("client-id").setEntityName(null))).
-
setKey("request_percentage").setValue(55.55).setRemove(false), (short) 1)),
+
setKey("request_percentage").setValue(55.55).setRemove(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new
EntityData().setEntityType("user").setEntityName("user-1"))).
-
setKey("request_percentage").setValue(56.56).setRemove(false), (short) 1)),
+
setKey("request_percentage").setValue(56.56).setRemove(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new
EntityData().setEntityType("user").setEntityName("user-2"))).
-
setKey("request_percentage").setValue(57.57).setRemove(false), (short) 1)),
+
setKey("request_percentage").setValue(57.57).setRemove(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new
EntityData().setEntityType("user").setEntityName("user-3"))).
-
setKey("request_percentage").setValue(58.58).setRemove(false), (short) 1)),
+
setKey("request_percentage").setValue(58.58).setRemove(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new EntityData().setEntityType("user").setEntityName(null))).
-
setKey("request_percentage").setValue(59.59).setRemove(false), (short) 1)),
+
setKey("request_percentage").setValue(59.59).setRemove(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new
ClientQuotaRecord().setEntity(Arrays.asList(
new
EntityData().setEntityType("client-id").setEntityName("client-id-2"))).
-
setKey("request_percentage").setValue(60.60).setRemove(false), (short) 1))),
+
setKey("request_percentage").setValue(60.60).setRemove(false), (short) 0))),
manager.iterator(Long.MAX_VALUE));
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 1fae0aa..195b02a 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -185,7 +185,7 @@ public class ClusterControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com")).iterator())).
- setFenced(false), (short) 1)),
+ setFenced(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerEpoch(100).setBrokerId(1).setRack(null).
setEndPoints(new
BrokerEndpointCollection(Collections.singleton(
@@ -193,7 +193,7 @@ public class ClusterControlManagerTest {
setPort((short) 9093).
setName("PLAINTEXT").
setHost("example.com")).iterator())).
- setFenced(false), (short) 1)),
+ setFenced(false), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerEpoch(100).setBrokerId(2).setRack(null).
setEndPoints(new
BrokerEndpointCollection(Collections.singleton(
@@ -201,7 +201,7 @@ public class ClusterControlManagerTest {
setPort((short) 9094).
setName("PLAINTEXT").
setHost("example.com")).iterator())).
- setFenced(true), (short) 1))),
+ setFenced(true), (short) 0))),
clusterControl.iterator(Long.MAX_VALUE));
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index ec15e66..9cba393 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -107,10 +107,10 @@ public class ConfigurationControlManagerTest {
RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
Arrays.asList(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("abc").setValue("x,y,z"), (short) 1),
+ setName("abc").setValue("x,y,z"), (short) 0),
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("def").setValue("blah"), (short) 1))),
+ setName("def").setValue("blah"), (short) 0))),
manager.iterator(Long.MAX_VALUE));
}
@@ -146,7 +146,7 @@ public class ConfigurationControlManagerTest {
new ConfigurationControlManager(new LogContext(),
snapshotRegistry, CONFIGS);
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new
ApiMessageAndVersion(
new
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("abc").setValue("123"), (short) 1)),
+ setName("abc").setValue("123"), (short) 0)),
toMap(entry(BROKER0, new ApiError(Errors.INVALID_REQUEST,
"A DELETE op was given with a non-null value.")),
entry(MYTOPIC, ApiError.NONE))),
@@ -187,10 +187,10 @@ public class ConfigurationControlManagerTest {
List<ApiMessageAndVersion> expectedRecords1 = Arrays.asList(
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("abc").setValue("456"), (short) 1),
+ setName("abc").setValue("456"), (short) 0),
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("def").setValue("901"), (short) 1));
+ setName("def").setValue("901"), (short) 0));
assertEquals(
ControllerResult.atomicOf(
expectedRecords1,
@@ -212,7 +212,7 @@ public class ConfigurationControlManagerTest {
.setResourceName("mytopic")
.setName("abc")
.setValue(null),
- (short) 1
+ (short) 0
)
),
toMap(entry(MYTOPIC, ApiError.NONE))
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 45c969d..38a368d 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -37,20 +37,20 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
-import
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
import
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
-import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import
org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignmentCollection;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
@@ -58,9 +58,9 @@ import
org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import
org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
@@ -77,8 +77,6 @@ import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.SnapshotReader;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static java.util.concurrent.TimeUnit.HOURS;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@@ -93,8 +91,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class QuorumControllerTest {
- private static final Logger log =
- LoggerFactory.getLogger(QuorumControllerTest.class);
/**
* Test creating a new QuorumController and closing it.
@@ -424,17 +420,17 @@ public class QuorumControllerTest {
private List<ApiMessageAndVersion> expectedSnapshotContent(Uuid fooId,
Map<Integer, Long> brokerEpochs) {
return Arrays.asList(
new ApiMessageAndVersion(new TopicRecord().
- setName("foo").setTopicId(fooId), (short) 1),
+ setName("foo").setTopicId(fooId), (short) 0),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).
setIsr(Arrays.asList(0, 1,
2)).setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).
- setPartitionEpoch(0), (short) 1),
+ setPartitionEpoch(0), (short) 0),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(1).
setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).
setIsr(Arrays.asList(1, 2,
0)).setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
- setPartitionEpoch(0), (short) 1),
+ setPartitionEpoch(0), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB0")).
@@ -444,7 +440,7 @@ public class QuorumControllerTest {
new
BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
setPort(9092).setSecurityProtocol((short)
0)).iterator())).
setRack(null).
- setFenced(false), (short) 1),
+ setFenced(false), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB1")).
@@ -454,7 +450,7 @@ public class QuorumControllerTest {
new
BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
setPort(9093).setSecurityProtocol((short)
0)).iterator())).
setRack(null).
- setFenced(false), (short) 1),
+ setFenced(false), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB2")).
@@ -464,14 +460,14 @@ public class QuorumControllerTest {
new
BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
setPort(9094).setSecurityProtocol((short)
0)).iterator())).
setRack(null).
- setFenced(false), (short) 1),
+ setFenced(false), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(3).setBrokerEpoch(brokerEpochs.get(3)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB3")).
setEndPoints(new BrokerEndpointCollection(Arrays.asList(
new
BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
setPort(9095).setSecurityProtocol((short)
0)).iterator())).
- setRack(null), (short) 1),
+ setRack(null), (short) 0),
new ApiMessageAndVersion(new ProducerIdsRecord().
setBrokerId(0).
setBrokerEpoch(brokerEpochs.get(0)).
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 6298c95..0182477 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -201,9 +201,9 @@ public class ReplicationControlManagerTest {
setPartitionId(0).setTopicId(fooId).
setReplicas(Arrays.asList(1, 2,
0)).setIsr(Arrays.asList(1, 2, 0)).
setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).
- setLeaderEpoch(0).setPartitionEpoch(0), (short) 1),
+ setLeaderEpoch(0).setPartitionEpoch(0), (short) 0),
new ApiMessageAndVersion(new TopicRecord().
- setTopicId(fooId).setName("foo"), (short) 1))),
+ setTopicId(fooId).setName("foo"), (short) 0))),
ctx.replicationControl.iterator(Long.MAX_VALUE));
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
index 3a25b8d..cbcbe85 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
@@ -101,7 +101,7 @@ class MetadataRecordSerdeTest {
MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.clear();
- buffer.put((byte) 0x00);
+ buffer.put((byte) 0x01);
buffer.put((byte) 0x80);
buffer.put((byte) 0x80);
buffer.put((byte) 0x80);
@@ -123,7 +123,7 @@ class MetadataRecordSerdeTest {
MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.clear();
- buffer.put((byte) 0x00);
+ buffer.put((byte) 0x01);
buffer.put((byte) 0x08);
buffer.put((byte) 0x80);
buffer.put((byte) 0x80);
@@ -145,7 +145,7 @@ class MetadataRecordSerdeTest {
MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.clear();
- buffer.put((byte) 0x00); // frame version
+ buffer.put((byte) 0x01); // frame version
buffer.put((byte) 0x08); // apiKey
buffer.put((byte) 0xff); // api version
buffer.put((byte) 0xff); // api version
@@ -166,7 +166,7 @@ class MetadataRecordSerdeTest {
public void testParsingUnsupportedApiKey() {
MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(64);
- buffer.put((byte) 0x00); // frame version
+ buffer.put((byte) 0x01); // frame version
buffer.put((byte) 0xff); // apiKey
buffer.put((byte) 0x7f); // apiKey
buffer.put((byte) 0x00); // api version
@@ -185,7 +185,7 @@ class MetadataRecordSerdeTest {
public void testParsingMalformedMessage() {
MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(4);
- buffer.put((byte) 0x00); // frame version
+ buffer.put((byte) 0x01); // frame version
buffer.put((byte) 0x00); // apiKey
buffer.put((byte) 0x00); // apiVersion
buffer.put((byte) 0x80); // malformed data
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
index d292e39..7533178 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
@@ -42,7 +42,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
* </pre>
*/
public abstract class AbstractApiMessageSerde implements
RecordSerde<ApiMessageAndVersion> {
- private static final short DEFAULT_FRAME_VERSION = 0;
+ private static final short DEFAULT_FRAME_VERSION = 1;
private static final int DEFAULT_FRAME_VERSION_SIZE =
ByteUtils.sizeOfUnsignedVarint(DEFAULT_FRAME_VERSION);
private static short unsignedIntToShort(Readable input, String entity) {
@@ -83,7 +83,10 @@ public abstract class AbstractApiMessageSerde implements
RecordSerde<ApiMessageA
int size) {
short frameVersion = unsignedIntToShort(input, "frame version");
- if (frameVersion != DEFAULT_FRAME_VERSION) {
+ if (frameVersion == 0) {
+ throw new MetadataParseException("Could not deserialize metadata
record with frame version 0. " +
+ "Note that upgrades from the preview release of KRaft in 2.8
to newer versions are not supported.");
+ } else if (frameVersion != DEFAULT_FRAME_VERSION) {
throw new MetadataParseException("Could not deserialize metadata
record due to unknown frame version "
+ frameVersion + "(only frame version " +
DEFAULT_FRAME_VERSION + " is supported)");
}