This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d920f8bc1f6 KAFKA-10863 Convert ControRecordType schema to use
auto-generated protocol (#21439)
d920f8bc1f6 is described below
commit d920f8bc1f668b48c73abfd7214dabf14ca45a2b
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Sun Feb 15 08:54:16 2026 +0800
KAFKA-10863 Convert ControRecordType schema to use auto-generated protocol
(#21439)
Modified From #19110
1. Convert ControlRecordType schema to use auto-generated protocol
`ControlRecordTypeSchema`.
2. Substitute `CURRENT_CONTROL_RECORD_KEY_SIZE` with
`controlRecordKeySize()` method since the size is accumulated from
`ControlRecordTypeSchema`.
3. Add buffer to `ControlRecordType` to avoid twice compute from
`recordKey()` and `controlRecordKeySize()`.
4. flexibleVersions is set to none.
Reviewers: Chia-Ping Tsai <[email protected]>
---------
Co-authored-by: dengziming <[email protected]>
---
.../common/record/internal/ControlRecordType.java | 51 ++++++++--------
.../record/internal/EndTransactionMarker.java | 2 +-
.../record/internal/MemoryRecordsBuilder.java | 6 +-
.../common/message/ControlRecordTypeSchema.json | 26 ++++++++
.../record/internal/ControlRecordTypeTest.java | 70 ++++++++++++++++++++--
.../record/internal/EndTransactionMarkerTest.java | 2 +-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 6 +-
.../kafka/metadata/util/BatchFileReader.java | 3 +-
.../kafka/metadata/util/SnapshotFileReader.java | 3 +-
9 files changed, 123 insertions(+), 46 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/internal/ControlRecordType.java
b/clients/src/main/java/org/apache/kafka/common/record/internal/ControlRecordType.java
index 1a9f8698724..74f6f5f6cc7 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/internal/ControlRecordType.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/internal/ControlRecordType.java
@@ -17,10 +17,9 @@
package org.apache.kafka.common.record.internal;
import org.apache.kafka.common.InvalidRecordException;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.message.ControlRecordTypeSchema;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,47 +57,51 @@ public enum ControlRecordType {
UNKNOWN((short) -1);
private static final Logger log =
LoggerFactory.getLogger(ControlRecordType.class);
-
- static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0;
- static final int CURRENT_CONTROL_RECORD_KEY_SIZE = 4;
- private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new
Schema(
- new Field("version", Type.INT16),
- new Field("type", Type.INT16));
+ private static final int CONTROL_RECORD_KEY_SIZE = 4;
private final short type;
+ private final ByteBuffer buffer;
ControlRecordType(short type) {
this.type = type;
+ ControlRecordTypeSchema schema = new
ControlRecordTypeSchema().setType(type);
+ buffer =
MessageUtil.toVersionPrefixedByteBuffer(ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION,
schema);
}
public short type() {
return type;
}
- public Struct recordKey() {
+ public ByteBuffer recordKey() {
if (this == UNKNOWN)
throw new IllegalArgumentException("Cannot serialize UNKNOWN
control record type");
+ return buffer.duplicate();
+ }
- Struct struct = new Struct(CONTROL_RECORD_KEY_SCHEMA_VERSION_V0);
- struct.set("version", CURRENT_CONTROL_RECORD_KEY_VERSION);
- struct.set("type", type);
- return struct;
+ public int controlRecordKeySize() {
+ return buffer.remaining();
}
public static short parseTypeId(ByteBuffer key) {
- if (key.remaining() < CURRENT_CONTROL_RECORD_KEY_SIZE)
- throw new InvalidRecordException("Invalid value size found for end
control record key. Must have " +
- "at least " + CURRENT_CONTROL_RECORD_KEY_SIZE + " bytes,
but found only " + key.remaining());
-
- short version = key.getShort(0);
- if (version < 0)
+ // We should duplicate the original buffer since it will be read again
in some cases, for example,
+ // read by KafkaRaftClient and RaftClient.Listener
+ ByteBuffer buffer = key.duplicate();
+ if (buffer.remaining() < CONTROL_RECORD_KEY_SIZE)
+ throw new InvalidRecordException("Invalid value size found for
control record key. " +
+ "Must have at least " + CONTROL_RECORD_KEY_SIZE + " bytes,
but found only " + buffer.remaining());
+
+ short version = buffer.getShort();
+ if (version < ControlRecordTypeSchema.LOWEST_SUPPORTED_VERSION)
throw new InvalidRecordException("Invalid version found for
control record: " + version +
". May indicate data corruption");
- if (version != CURRENT_CONTROL_RECORD_KEY_VERSION)
+ if (version > ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION) {
log.debug("Received unknown control record key version {}. Parsing
as version {}", version,
- CURRENT_CONTROL_RECORD_KEY_VERSION);
- return key.getShort(2);
+ ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION);
+ version = ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION;
+ }
+ ControlRecordTypeSchema schema = new ControlRecordTypeSchema(new
ByteBufferAccessor(buffer), version);
+ return schema.type();
}
public static ControlRecordType fromTypeId(short typeId) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/internal/EndTransactionMarker.java
b/clients/src/main/java/org/apache/kafka/common/record/internal/EndTransactionMarker.java
index 29e789ba00d..281f32ab864 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/internal/EndTransactionMarker.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/internal/EndTransactionMarker.java
@@ -104,7 +104,7 @@ public class EndTransactionMarker {
public int endTxnMarkerValueSize() {
return DefaultRecord.sizeInBytes(0, 0L,
- ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE,
+ type.controlRecordKeySize(),
buffer.remaining(),
Record.EMPTY_HEADERS);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
index 13bd2715e8a..3fd89deaba1 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.Utils;
@@ -612,10 +611,7 @@ public class MemoryRecordsBuilder implements AutoCloseable
{
* @param value The control record value
*/
public void appendControlRecord(long timestamp, ControlRecordType type,
ByteBuffer value) {
- Struct keyStruct = type.recordKey();
- ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
- keyStruct.writeTo(key);
- key.flip();
+ ByteBuffer key = type.recordKey();
appendWithOffset(nextSequentialOffset(), true, timestamp, key, value,
Record.EMPTY_HEADERS);
}
diff --git
a/clients/src/main/resources/common/message/ControlRecordTypeSchema.json
b/clients/src/main/resources/common/message/ControlRecordTypeSchema.json
new file mode 100644
index 00000000000..bf168599e65
--- /dev/null
+++ b/clients/src/main/resources/common/message/ControlRecordTypeSchema.json
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "type": "data",
+ "name": "ControlRecordTypeSchema",
+ "validVersions": "0",
+ "flexibleVersions": "none",
+ "fields": [
+ { "name": "Type", "type": "int16", "versions": "0+",
+ "about": "The type of the control record, such as commit or abort"
+ }
+ ]
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/internal/ControlRecordTypeTest.java
b/clients/src/test/java/org/apache/kafka/common/record/internal/ControlRecordTypeTest.java
index 3526b2cc80a..2a0d42bd292 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/internal/ControlRecordTypeTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/internal/ControlRecordTypeTest.java
@@ -16,6 +16,12 @@
*/
package org.apache.kafka.common.record.internal;
+import org.apache.kafka.common.message.ControlRecordTypeSchema;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -26,10 +32,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class ControlRecordTypeTest {
+ // Old hard-coded schema, used to validate old hard-coded schema format is
exactly the same as new auto generated protocol format
+ private final Schema v0Schema = new Schema(
+ new Field("version", Type.INT16),
+ new Field("type", Type.INT16));
+
@Test
public void testParseUnknownType() {
ByteBuffer buffer = ByteBuffer.allocate(32);
- buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION);
+ buffer.putShort(ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION);
buffer.putShort((short) 337);
buffer.flip();
ControlRecordType type = ControlRecordType.parse(buffer);
@@ -50,11 +61,58 @@ public class ControlRecordTypeTest {
@ParameterizedTest
@EnumSource(value = ControlRecordType.class)
public void testRoundTrip(ControlRecordType expected) {
- ByteBuffer buffer = ByteBuffer.allocate(32);
- buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION);
- buffer.putShort(expected.type());
- buffer.flip();
+ if (expected == ControlRecordType.UNKNOWN) {
+ return;
+ }
+ for (short version = ControlRecordTypeSchema.LOWEST_SUPPORTED_VERSION;
+ version <= ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION;
version++) {
+ ByteBuffer buffer = expected.recordKey();
+ ControlRecordType deserializedKey =
ControlRecordType.parse(buffer);
+ assertEquals(expected, deserializedKey);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = ControlRecordType.class)
+ public void testValueControlRecordKeySize(ControlRecordType type) {
+ for (short version = ControlRecordTypeSchema.LOWEST_SUPPORTED_VERSION;
+ version <= ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION;
version++) {
+ assertEquals(4, type.controlRecordKeySize());
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = ControlRecordType.class)
+ public void testBackwardDeserializeCompatibility(ControlRecordType type) {
+ for (short version = ControlRecordTypeSchema.LOWEST_SUPPORTED_VERSION;
+ version <= ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION;
version++) {
+ Struct struct = new Struct(v0Schema);
+ struct.set("version", version);
+ struct.set("type", type.type());
+
+ ByteBuffer oldVersionBuffer = ByteBuffer.allocate(struct.sizeOf());
+ struct.writeTo(oldVersionBuffer);
+ oldVersionBuffer.flip();
+
+ ControlRecordType deserializedType =
ControlRecordType.parse(oldVersionBuffer);
+ assertEquals(type, deserializedType);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = ControlRecordType.class)
+ public void testForwardDeserializeCompatibility(ControlRecordType type) {
+ if (type == ControlRecordType.UNKNOWN) {
+ return;
+ }
+ for (short version = ControlRecordTypeSchema.LOWEST_SUPPORTED_VERSION;
+ version <= ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION;
version++) {
+ ByteBuffer newVersionBuffer = type.recordKey();
+
+ Struct struct = v0Schema.read(newVersionBuffer);
- assertEquals(expected, ControlRecordType.parse(buffer));
+ ControlRecordType deserializedType =
ControlRecordType.fromTypeId(struct.getShort("type"));
+ assertEquals(type, deserializedType);
+ }
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java
b/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java
index e1b943bce67..7438e9bba2c 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java
@@ -103,7 +103,7 @@ public class EndTransactionMarkerTest {
EndTransactionMarker marker = new EndTransactionMarker(type, 1);
int offsetSize = ByteUtils.sizeOfVarint(0);
int timestampSize = ByteUtils.sizeOfVarlong(0);
- int keySize = ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE;
+ int keySize = type.controlRecordKeySize();
int valueSize = marker.serializeValue().remaining();
int headerSize =
ByteUtils.sizeOfVarint(Record.EMPTY_HEADERS.length);
int totalSize = 1 + offsetSize + timestampSize +
ByteUtils.sizeOfVarint(keySize) + keySize + ByteUtils.sizeOfVarint(valueSize) +
valueSize + headerSize;
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index b3ab5395918..0485f36b0ab 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1147,11 +1147,7 @@ class LogCleanerTest extends Logging {
// Now we append one transaction with a key which conflicts with the
COMMIT marker appended above
def commitRecordKey(): ByteBuffer = {
- val keySize = ControlRecordType.COMMIT.recordKey().sizeOf()
- val key = ByteBuffer.allocate(keySize)
- ControlRecordType.COMMIT.recordKey().writeTo(key)
- key.flip()
- key
+ ControlRecordType.COMMIT.recordKey()
}
val producerId2 = 2L
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
index e1be071fb37..1868a3fbb6c 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
@@ -101,8 +101,7 @@ public final class BatchFileReader implements
Iterator<BatchFileReader.BatchAndT
List<ApiMessageAndVersion> messages = new ArrayList<>();
for (Record record : input) {
try {
- short typeId = ControlRecordType.parseTypeId(record.key());
- ControlRecordType type = ControlRecordType.fromTypeId(typeId);
+ ControlRecordType type = ControlRecordType.parse(record.key());
switch (type) {
case LEADER_CHANGE: {
LeaderChangeMessage message = new
LeaderChangeMessage();
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index 5240dac3ba1..4996f79071e 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -130,8 +130,7 @@ public final class SnapshotFileReader implements
AutoCloseable {
private void handleControlBatch(FileChannelRecordBatch batch) {
for (Record record : batch) {
try {
- short typeId = ControlRecordType.parseTypeId(record.key());
- ControlRecordType type = ControlRecordType.fromTypeId(typeId);
+ ControlRecordType type = ControlRecordType.parse(record.key());
switch (type) {
case LEADER_CHANGE:
LeaderChangeMessage message = new
LeaderChangeMessage();