This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d920f8bc1f6 KAFKA-10863 Convert ControRecordType schema to use 
auto-generated protocol (#21439)
d920f8bc1f6 is described below

commit d920f8bc1f668b48c73abfd7214dabf14ca45a2b
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Sun Feb 15 08:54:16 2026 +0800

    KAFKA-10863 Convert ControRecordType schema to use auto-generated protocol 
(#21439)
    
    Modified From #19110
    
    1. Convert ControlRecordType schema to use auto-generated protocol
    `ControlRecordTypeSchema`.
    2. Substitute `CURRENT_CONTROL_RECORD_KEY_SIZE` with
    `controlRecordKeySize()` method since the size is accumulated from
    `ControlRecordTypeSchema`.
    3. Add buffer to `ControlRecordType` to avoid twice compute from
    `recordKey()` and `controlRecordKeySize()`.
    4. flexibleVersions is set to none.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
    
    ---------
    
    Co-authored-by: dengziming <[email protected]>
---
 .../common/record/internal/ControlRecordType.java  | 51 ++++++++--------
 .../record/internal/EndTransactionMarker.java      |  2 +-
 .../record/internal/MemoryRecordsBuilder.java      |  6 +-
 .../common/message/ControlRecordTypeSchema.json    | 26 ++++++++
 .../record/internal/ControlRecordTypeTest.java     | 70 ++++++++++++++++++++--
 .../record/internal/EndTransactionMarkerTest.java  |  2 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  6 +-
 .../kafka/metadata/util/BatchFileReader.java       |  3 +-
 .../kafka/metadata/util/SnapshotFileReader.java    |  3 +-
 9 files changed, 123 insertions(+), 46 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/internal/ControlRecordType.java
 
b/clients/src/main/java/org/apache/kafka/common/record/internal/ControlRecordType.java
index 1a9f8698724..74f6f5f6cc7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/internal/ControlRecordType.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/internal/ControlRecordType.java
@@ -17,10 +17,9 @@
 package org.apache.kafka.common.record.internal;
 
 import org.apache.kafka.common.InvalidRecordException;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.message.ControlRecordTypeSchema;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,47 +57,51 @@ public enum ControlRecordType {
     UNKNOWN((short) -1);
 
     private static final Logger log = 
LoggerFactory.getLogger(ControlRecordType.class);
-
-    static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0;
-    static final int CURRENT_CONTROL_RECORD_KEY_SIZE = 4;
-    private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new 
Schema(
-            new Field("version", Type.INT16),
-            new Field("type", Type.INT16));
+    private static final int CONTROL_RECORD_KEY_SIZE = 4;
 
     private final short type;
+    private final ByteBuffer buffer;
 
     ControlRecordType(short type) {
         this.type = type;
+        ControlRecordTypeSchema schema = new 
ControlRecordTypeSchema().setType(type);
+        buffer = 
MessageUtil.toVersionPrefixedByteBuffer(ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION,
 schema);
     }
 
     public short type() {
         return type;
     }
 
-    public Struct recordKey() {
+    public ByteBuffer recordKey() {
         if (this == UNKNOWN)
             throw new IllegalArgumentException("Cannot serialize UNKNOWN 
control record type");
+        return buffer.duplicate();
+    }
 
-        Struct struct = new Struct(CONTROL_RECORD_KEY_SCHEMA_VERSION_V0);
-        struct.set("version", CURRENT_CONTROL_RECORD_KEY_VERSION);
-        struct.set("type", type);
-        return struct;
+    public int controlRecordKeySize() {
+        return buffer.remaining();
     }
 
     public static short parseTypeId(ByteBuffer key) {
-        if (key.remaining() < CURRENT_CONTROL_RECORD_KEY_SIZE)
-            throw new InvalidRecordException("Invalid value size found for end 
control record key. Must have " +
-                    "at least " + CURRENT_CONTROL_RECORD_KEY_SIZE + " bytes, 
but found only " + key.remaining());
-
-        short version = key.getShort(0);
-        if (version < 0)
+        // We should duplicate the original buffer since it will be read again 
in some cases, for example,
+        // read by KafkaRaftClient and RaftClient.Listener
+        ByteBuffer buffer = key.duplicate();
+        if (buffer.remaining() < CONTROL_RECORD_KEY_SIZE)
+            throw new InvalidRecordException("Invalid value size found for 
control record key. " +
+                    "Must have at least " + CONTROL_RECORD_KEY_SIZE + " bytes, 
but found only " + buffer.remaining());
+
+        short version = buffer.getShort();
+        if (version < ControlRecordTypeSchema.LOWEST_SUPPORTED_VERSION)
             throw new InvalidRecordException("Invalid version found for 
control record: " + version +
                     ". May indicate data corruption");
 
-        if (version != CURRENT_CONTROL_RECORD_KEY_VERSION)
+        if (version > ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION) {
             log.debug("Received unknown control record key version {}. Parsing 
as version {}", version,
-                    CURRENT_CONTROL_RECORD_KEY_VERSION);
-        return key.getShort(2);
+                    ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION);
+            version = ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION;
+        }
+        ControlRecordTypeSchema schema = new ControlRecordTypeSchema(new 
ByteBufferAccessor(buffer), version);
+        return schema.type();
     }
 
     public static ControlRecordType fromTypeId(short typeId) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/internal/EndTransactionMarker.java
 
b/clients/src/main/java/org/apache/kafka/common/record/internal/EndTransactionMarker.java
index 29e789ba00d..281f32ab864 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/internal/EndTransactionMarker.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/internal/EndTransactionMarker.java
@@ -104,7 +104,7 @@ public class EndTransactionMarker {
 
     public int endTxnMarkerValueSize() {
         return DefaultRecord.sizeInBytes(0, 0L,
-                ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE,
+                type.controlRecordKeySize(),
                 buffer.remaining(),
                 Record.EMPTY_HEADERS);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
index 13bd2715e8a..3fd89deaba1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/internal/MemoryRecordsBuilder.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.message.SnapshotFooterRecord;
 import org.apache.kafka.common.message.SnapshotHeaderRecord;
 import org.apache.kafka.common.message.VotersRecord;
 import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.Utils;
@@ -612,10 +611,7 @@ public class MemoryRecordsBuilder implements AutoCloseable 
{
      * @param value The control record value
      */
     public void appendControlRecord(long timestamp, ControlRecordType type, 
ByteBuffer value) {
-        Struct keyStruct = type.recordKey();
-        ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
-        keyStruct.writeTo(key);
-        key.flip();
+        ByteBuffer key = type.recordKey();
         appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, 
Record.EMPTY_HEADERS);
     }
 
diff --git 
a/clients/src/main/resources/common/message/ControlRecordTypeSchema.json 
b/clients/src/main/resources/common/message/ControlRecordTypeSchema.json
new file mode 100644
index 00000000000..bf168599e65
--- /dev/null
+++ b/clients/src/main/resources/common/message/ControlRecordTypeSchema.json
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "ControlRecordTypeSchema",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    { "name": "Type", "type": "int16", "versions": "0+",
+      "about": "The type of the control record, such as commit or abort"
+    }
+  ]
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/internal/ControlRecordTypeTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/internal/ControlRecordTypeTest.java
index 3526b2cc80a..2a0d42bd292 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/internal/ControlRecordTypeTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/internal/ControlRecordTypeTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.kafka.common.record.internal;
 
+import org.apache.kafka.common.message.ControlRecordTypeSchema;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -26,10 +32,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ControlRecordTypeTest {
 
+    // Old hard-coded schema, used to validate old hard-coded schema format is 
exactly the same as new auto generated protocol format
+    private final Schema v0Schema = new Schema(
+            new Field("version", Type.INT16),
+            new Field("type", Type.INT16));
+
     @Test
     public void testParseUnknownType() {
         ByteBuffer buffer = ByteBuffer.allocate(32);
-        buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION);
+        buffer.putShort(ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION);
         buffer.putShort((short) 337);
         buffer.flip();
         ControlRecordType type = ControlRecordType.parse(buffer);
@@ -50,11 +61,58 @@ public class ControlRecordTypeTest {
     @ParameterizedTest
     @EnumSource(value = ControlRecordType.class)
     public void testRoundTrip(ControlRecordType expected) {
-        ByteBuffer buffer = ByteBuffer.allocate(32);
-        buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION);
-        buffer.putShort(expected.type());
-        buffer.flip();
+        if (expected == ControlRecordType.UNKNOWN) {
+            return;
+        }
+        for (short version = ControlRecordTypeSchema.LOWEST_SUPPORTED_VERSION;
+             version <= ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION; 
version++) {
+            ByteBuffer buffer = expected.recordKey();
+            ControlRecordType deserializedKey = 
ControlRecordType.parse(buffer);
+            assertEquals(expected, deserializedKey);
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = ControlRecordType.class)
+    public void testValueControlRecordKeySize(ControlRecordType type) {
+        for (short version = ControlRecordTypeSchema.LOWEST_SUPPORTED_VERSION;
+             version <= ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION; 
version++) {
+            assertEquals(4, type.controlRecordKeySize());
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = ControlRecordType.class)
+    public void testBackwardDeserializeCompatibility(ControlRecordType type) {
+        for (short version = ControlRecordTypeSchema.LOWEST_SUPPORTED_VERSION;
+             version <= ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION; 
version++) {
+            Struct struct = new Struct(v0Schema);
+            struct.set("version", version);
+            struct.set("type", type.type());
+
+            ByteBuffer oldVersionBuffer = ByteBuffer.allocate(struct.sizeOf());
+            struct.writeTo(oldVersionBuffer);
+            oldVersionBuffer.flip();
+
+            ControlRecordType deserializedType = 
ControlRecordType.parse(oldVersionBuffer);
+            assertEquals(type, deserializedType);
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = ControlRecordType.class)
+    public void testForwardDeserializeCompatibility(ControlRecordType type) {
+        if (type == ControlRecordType.UNKNOWN) {
+            return;
+        }
+        for (short version = ControlRecordTypeSchema.LOWEST_SUPPORTED_VERSION;
+             version <= ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION; 
version++) {
+            ByteBuffer newVersionBuffer = type.recordKey();
+
+            Struct struct = v0Schema.read(newVersionBuffer);
 
-        assertEquals(expected, ControlRecordType.parse(buffer));
+            ControlRecordType deserializedType = 
ControlRecordType.fromTypeId(struct.getShort("type"));
+            assertEquals(type, deserializedType);
+        }
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java
index e1b943bce67..7438e9bba2c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java
@@ -103,7 +103,7 @@ public class EndTransactionMarkerTest {
             EndTransactionMarker marker = new EndTransactionMarker(type, 1);
             int offsetSize = ByteUtils.sizeOfVarint(0);
             int timestampSize = ByteUtils.sizeOfVarlong(0);
-            int keySize = ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE;
+            int keySize = type.controlRecordKeySize();
             int valueSize = marker.serializeValue().remaining();
             int headerSize = 
ByteUtils.sizeOfVarint(Record.EMPTY_HEADERS.length);
             int totalSize = 1 + offsetSize + timestampSize + 
ByteUtils.sizeOfVarint(keySize) + keySize + ByteUtils.sizeOfVarint(valueSize) + 
valueSize + headerSize;
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index b3ab5395918..0485f36b0ab 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1147,11 +1147,7 @@ class LogCleanerTest extends Logging {
 
     // Now we append one transaction with a key which conflicts with the 
COMMIT marker appended above
     def commitRecordKey(): ByteBuffer = {
-      val keySize = ControlRecordType.COMMIT.recordKey().sizeOf()
-      val key = ByteBuffer.allocate(keySize)
-      ControlRecordType.COMMIT.recordKey().writeTo(key)
-      key.flip()
-      key
+      ControlRecordType.COMMIT.recordKey()
     }
 
     val producerId2 = 2L
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
index e1be071fb37..1868a3fbb6c 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
@@ -101,8 +101,7 @@ public final class BatchFileReader implements 
Iterator<BatchFileReader.BatchAndT
         List<ApiMessageAndVersion> messages = new ArrayList<>();
         for (Record record : input) {
             try {
-                short typeId = ControlRecordType.parseTypeId(record.key());
-                ControlRecordType type = ControlRecordType.fromTypeId(typeId);
+                ControlRecordType type = ControlRecordType.parse(record.key());
                 switch (type) {
                     case LEADER_CHANGE: {
                         LeaderChangeMessage message = new 
LeaderChangeMessage();
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index 5240dac3ba1..4996f79071e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -130,8 +130,7 @@ public final class SnapshotFileReader implements 
AutoCloseable {
     private void handleControlBatch(FileChannelRecordBatch batch) {
         for (Record record : batch) {
             try {
-                short typeId = ControlRecordType.parseTypeId(record.key());
-                ControlRecordType type = ControlRecordType.fromTypeId(typeId);
+                ControlRecordType type = ControlRecordType.parse(record.key());
                 switch (type) {
                     case LEADER_CHANGE:
                         LeaderChangeMessage message = new 
LeaderChangeMessage();

Reply via email to