This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9dfc5a72423 MINOR: Prefer MetadataRecordSerde.INSTANCE over new 
instances (#22289)
9dfc5a72423 is described below

commit 9dfc5a724239042ed6add3e87c0041e3ef9c2dee
Author: JiayĆ”o Sun <[email protected]>
AuthorDate: Sat May 16 14:06:36 2026 +1200

    MINOR: Prefer MetadataRecordSerde.INSTANCE over new instances (#22289)
    
    Replace `new MetadataRecordSerde()` with `MetadataRecordSerde.INSTANCE`
    across the codebase. MetadataRecordSerde is stateless and thread-safe,
    so a single shared instance is sufficient.
    
    Also add Javadoc on the public constructor to guide future callers
    toward the singleton.
    
    Relevant discussions:
    - https://github.com/apache/kafka/pull/22116#discussion_r3237099653
    - https://github.com/apache/kafka/pull/10990#discussion_r666715085
    
    Verified that MetadataRecordSerde (and its parent
    AbstractApiMessageSerde)  has no mutable instance fields. All methods
    (read/write/recordSize) operate  solely on their parameters and local
    variables, confirming no thread-safety  or state-recycling concerns with
    sharing a single instance.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../src/main/scala/kafka/server/SharedServer.scala |  2 +-
 .../apache/kafka/metadata/MetadataRecordSerde.java |  7 +++++
 .../apache/kafka/metadata/storage/Formatter.java   |  2 +-
 .../kafka/metadata/util/BatchFileReader.java       |  4 +--
 .../kafka/metadata/util/BatchFileWriter.java       |  2 +-
 .../kafka/metadata/util/SnapshotFileReader.java    |  3 +-
 .../apache/kafka/controller/MockRaftClient.java    |  6 ++--
 .../kafka/metadata/MetadataRecordSerdeTest.java    | 36 +++++++++-------------
 .../kafka/server/RaftClusterSnapshotTest.java      |  2 +-
 .../org/apache/kafka/tools/DumpLogSegments.java    |  3 +-
 .../apache/kafka/tools/DumpLogSegmentsTest.java    |  5 ++-
 11 files changed, 33 insertions(+), 39 deletions(-)

diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index 19f88f62bb7..fcc39b94bec 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -294,7 +294,7 @@ class SharedServer(
           clusterId,
           sharedServerConfig,
           
metaPropsEnsemble.logDirProps.get(metaPropsEnsemble.metadataLogDir.get).directoryId.get,
-          new MetadataRecordSerde,
+          MetadataRecordSerde.INSTANCE,
           KafkaRaftServer.MetadataPartition,
           KafkaRaftServer.MetadataTopicId,
           time,
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java 
b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
index 7964fed11c2..de04cf8f58a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
@@ -23,6 +23,13 @@ import 
org.apache.kafka.server.common.serialization.AbstractApiMessageSerde;
 public class MetadataRecordSerde extends AbstractApiMessageSerde {
     public static final MetadataRecordSerde INSTANCE = new 
MetadataRecordSerde();
 
+    /**
+     * Prefer using {@link #INSTANCE} instead of creating new instances,
+     * as this class is stateless and thread-safe.
+     */
+    public MetadataRecordSerde() {
+    }
+
     @Override
     public ApiMessage apiMessageFor(short apiKey) {
         return MetadataRecordType.fromId(apiKey).newMetadataRecord();
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 3d3d3226826..1ccf0c3d5c0 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -506,7 +506,7 @@ public class Formatter {
                 Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
             setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
             setVoterSet(Optional.of(voterSet));
-        try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = 
builder.build(new MetadataRecordSerde())) {
+        try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = 
builder.build(MetadataRecordSerde.INSTANCE)) {
             writer.freeze();
         }
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
index 1868a3fbb6c..71957dce6cf 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
@@ -74,12 +74,10 @@ public final class BatchFileReader implements 
Iterator<BatchFileReader.BatchAndT
 
     private final FileRecords fileRecords;
     private Iterator<FileChannelRecordBatch> batchIterator;
-    private final MetadataRecordSerde serde;
 
     private BatchFileReader(FileRecords fileRecords) {
         this.fileRecords = fileRecords;
         this.batchIterator = fileRecords.batchIterator();
-        this.serde = new MetadataRecordSerde();
     }
 
     @Override
@@ -142,7 +140,7 @@ public final class BatchFileReader implements 
Iterator<BatchFileReader.BatchAndT
         for (Record record : input) {
             try {
                 ByteBufferAccessor accessor = new 
ByteBufferAccessor(record.value());
-                ApiMessageAndVersion messageAndVersion = serde.read(accessor, 
record.valueSize());
+                ApiMessageAndVersion messageAndVersion = 
MetadataRecordSerde.INSTANCE.read(accessor, record.valueSize());
                 messages.add(messageAndVersion);
             } catch (Throwable e) {
                 throw new RuntimeException("unable to deserialize record at 
offset " + record.offset(), e);
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
index 2515821f549..f361f996997 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
@@ -97,7 +97,7 @@ public class BatchFileWriter implements AutoCloseable {
             new BatchMemoryPool(5, MAX_BATCH_SIZE_BYTES),
             time,
             Compression.NONE,
-            new MetadataRecordSerde()
+            MetadataRecordSerde.INSTANCE
         );
 
         // Append the snapshot header control record and force it to create a 
batch
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index fc5a47b1ea5..ea3d4fcd672 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -58,7 +58,6 @@ public final class SnapshotFileReader implements 
AutoCloseable {
     private final CompletableFuture<Void> caughtUpFuture;
     private FileRecords fileRecords;
     private Iterator<FileChannelRecordBatch> batchIterator;
-    private final MetadataRecordSerde serde = new MetadataRecordSerde();
     private long lastOffset = -1L;
     private volatile OptionalLong highWaterMark = OptionalLong.empty();
 
@@ -155,7 +154,7 @@ public final class SnapshotFileReader implements 
AutoCloseable {
         for (Record record : batch) {
             ByteBufferAccessor accessor = new 
ByteBufferAccessor(record.value());
             try {
-                ApiMessageAndVersion messageAndVersion = serde.read(accessor, 
record.valueSize());
+                ApiMessageAndVersion messageAndVersion = 
MetadataRecordSerde.INSTANCE.read(accessor, record.valueSize());
                 messages.add(messageAndVersion);
             } catch (Throwable e) {
                 log.error("unable to read metadata record at offset {}", 
record.offset(), e);
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java 
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
index f84dea081a6..f817717a2b3 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
@@ -479,7 +479,7 @@ public final class MockRaftClient implements 
RaftClient<ApiMessageAndVersion>, A
                                 listenerData.handleLoadSnapshot(
                                     RecordsSnapshotReader.of(
                                         snapshot.get(),
-                                        new MetadataRecordSerde(),
+                                        MetadataRecordSerde.INSTANCE,
                                         BufferSupplier.create(),
                                         Integer.MAX_VALUE,
                                         true,
@@ -554,7 +554,7 @@ public final class MockRaftClient implements 
RaftClient<ApiMessageAndVersion>, A
     }
 
     private static int messageSize(ApiMessageAndVersion messageAndVersion, 
ObjectSerializationCache objectCache) {
-        return new MetadataRecordSerde().recordSize(messageAndVersion, 
objectCache);
+        return MetadataRecordSerde.INSTANCE.recordSize(messageAndVersion, 
objectCache);
     }
 
     public void beginShutdown() {
@@ -729,7 +729,7 @@ public final class MockRaftClient implements 
RaftClient<ApiMessageAndVersion>, A
                 .setLastContainedLogTimestamp(lastContainedLogTimestamp)
                 .setTime(new MockTime())
                 .setRawSnapshotWriter(createNewSnapshot(snapshotId))
-                .build(new MetadataRecordSerde())
+                .build(MetadataRecordSerde.INSTANCE)
         );
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
index 16cb21ee289..c6efa8173c9 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
@@ -41,22 +41,22 @@ class MetadataRecordSerdeTest {
             .setName("foo")
             .setTopicId(Uuid.randomUuid());
 
-        MetadataRecordSerde serde = new MetadataRecordSerde();
+
 
         for (short version = TopicRecord.LOWEST_SUPPORTED_VERSION; version <= 
TopicRecord.HIGHEST_SUPPORTED_VERSION; version++) {
             ApiMessageAndVersion messageAndVersion = new 
ApiMessageAndVersion(topicRecord, version);
 
             ObjectSerializationCache cache = new ObjectSerializationCache();
-            int size = serde.recordSize(messageAndVersion, cache);
+            int size = 
MetadataRecordSerde.INSTANCE.recordSize(messageAndVersion, cache);
 
             ByteBuffer buffer = ByteBuffer.allocate(size);
             ByteBufferAccessor bufferAccessor = new ByteBufferAccessor(buffer);
 
-            serde.write(messageAndVersion, cache, bufferAccessor);
+            MetadataRecordSerde.INSTANCE.write(messageAndVersion, cache, 
bufferAccessor);
             buffer.flip();
 
             assertEquals(size, buffer.remaining());
-            ApiMessageAndVersion readMessageAndVersion = 
serde.read(bufferAccessor, size);
+            ApiMessageAndVersion readMessageAndVersion = 
MetadataRecordSerde.INSTANCE.read(bufferAccessor, size);
             assertEquals(messageAndVersion, readMessageAndVersion);
         }
     }
@@ -67,10 +67,9 @@ class MetadataRecordSerdeTest {
         ByteUtils.writeUnsignedVarint(15, buffer);
         buffer.flip();
 
-        MetadataRecordSerde serde = new MetadataRecordSerde();
         assertStartsWith("Could not deserialize metadata record due to unknown 
frame version",
                 assertThrows(MetadataParseException.class,
-                        () -> serde.read(new ByteBufferAccessor(buffer), 
16)).getMessage());
+                        () -> MetadataRecordSerde.INSTANCE.read(new 
ByteBufferAccessor(buffer), 16)).getMessage());
     }
 
     /**
@@ -78,7 +77,6 @@ class MetadataRecordSerdeTest {
      */
     @Test
     public void testParsingMalformedFrameVersionVarint() {
-        MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.clear();
         buffer.put((byte) 0x80);
@@ -91,7 +89,7 @@ class MetadataRecordSerdeTest {
         buffer.limit(64);
         assertStartsWith("Error while reading frame version",
                 assertThrows(MetadataParseException.class,
-                        () -> serde.read(new ByteBufferAccessor(buffer), 
buffer.remaining())).getMessage());
+                        () -> MetadataRecordSerde.INSTANCE.read(new 
ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
     }
 
     /**
@@ -99,7 +97,6 @@ class MetadataRecordSerdeTest {
      */
     @Test
     public void testParsingMalformedMessageTypeVarint() {
-        MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.clear();
         buffer.put((byte) 0x01);
@@ -113,7 +110,7 @@ class MetadataRecordSerdeTest {
         buffer.limit(64);
         assertStartsWith("Error while reading type",
                 assertThrows(MetadataParseException.class,
-                        () -> serde.read(new ByteBufferAccessor(buffer), 
buffer.remaining())).getMessage());
+                        () -> MetadataRecordSerde.INSTANCE.read(new 
ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
     }
 
     /**
@@ -121,7 +118,6 @@ class MetadataRecordSerdeTest {
      */
     @Test
     public void testParsingMalformedMessageVersionVarint() {
-        MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.clear();
         buffer.put((byte) 0x01);
@@ -135,7 +131,7 @@ class MetadataRecordSerdeTest {
         buffer.limit(64);
         assertStartsWith("Error while reading version",
                 assertThrows(MetadataParseException.class,
-                        () -> serde.read(new ByteBufferAccessor(buffer), 
buffer.remaining())).getMessage());
+                        () -> MetadataRecordSerde.INSTANCE.read(new 
ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
     }
 
     /**
@@ -143,7 +139,6 @@ class MetadataRecordSerdeTest {
      */
     @Test
     public void testParsingVersionTooLarge() {
-        MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.clear();
         buffer.put((byte) 0x01); // frame version
@@ -157,7 +152,7 @@ class MetadataRecordSerdeTest {
         buffer.limit(64);
         assertStartsWith("Value for version was too large",
                 assertThrows(MetadataParseException.class,
-                        () -> serde.read(new ByteBufferAccessor(buffer), 
buffer.remaining())).getMessage());
+                        () -> MetadataRecordSerde.INSTANCE.read(new 
ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
     }
 
     /**
@@ -165,7 +160,6 @@ class MetadataRecordSerdeTest {
      */
     @Test
     public void testParsingUnsupportedApiKey() {
-        MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.put((byte) 0x01); // frame version
         buffer.put((byte) 0xff); // apiKey
@@ -176,7 +170,7 @@ class MetadataRecordSerdeTest {
         buffer.limit(64);
         assertStartsWith("Unknown metadata id ",
                 assertThrows(MetadataParseException.class,
-                        () -> serde.read(new ByteBufferAccessor(buffer), 
buffer.remaining())).getCause().getMessage());
+                        () -> MetadataRecordSerde.INSTANCE.read(new 
ByteBufferAccessor(buffer), buffer.remaining())).getCause().getMessage());
     }
 
     /**
@@ -184,7 +178,6 @@ class MetadataRecordSerdeTest {
      */
     @Test
     public void testParsingMalformedMessage() {
-        MetadataRecordSerde serde = new MetadataRecordSerde();
         ByteBuffer buffer = ByteBuffer.allocate(4);
         buffer.put((byte) 0x01); // frame version
         buffer.put((byte) 0x00); // apiKey
@@ -194,7 +187,7 @@ class MetadataRecordSerdeTest {
         buffer.limit(4);
         assertStartsWith("Failed to deserialize record with type",
                 assertThrows(MetadataParseException.class,
-                        () -> serde.read(new ByteBufferAccessor(buffer), 
buffer.remaining())).getMessage());
+                        () -> MetadataRecordSerde.INSTANCE.read(new 
ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
     }
 
     /**
@@ -202,19 +195,18 @@ class MetadataRecordSerdeTest {
      */
     @Test
     public void testParsingRecordWithGarbageAtEnd() {
-        MetadataRecordSerde serde = new MetadataRecordSerde();
         RegisterBrokerRecord message = new 
RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(2);
 
         ObjectSerializationCache cache = new ObjectSerializationCache();
         ApiMessageAndVersion messageAndVersion = new 
ApiMessageAndVersion(message, (short) 0);
-        int size = serde.recordSize(messageAndVersion, cache);
+        int size = MetadataRecordSerde.INSTANCE.recordSize(messageAndVersion, 
cache);
         ByteBuffer buffer = ByteBuffer.allocate(size + 1);
 
-        serde.write(messageAndVersion, cache, new ByteBufferAccessor(buffer));
+        MetadataRecordSerde.INSTANCE.write(messageAndVersion, cache, new 
ByteBufferAccessor(buffer));
         buffer.clear();
         assertStartsWith("Found 1 byte(s) of garbage after",
                 assertThrows(MetadataParseException.class,
-                        () -> serde.read(new ByteBufferAccessor(buffer), size 
+ 1)).getMessage());
+                        () -> MetadataRecordSerde.INSTANCE.read(new 
ByteBufferAccessor(buffer), size + 1)).getMessage());
     }
 
     private static void assertStartsWith(String prefix, String str) {
diff --git 
a/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java 
b/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
index 616405103e2..0d58bf482fa 100644
--- a/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
+++ b/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
@@ -100,7 +100,7 @@ public class RaftClusterSnapshotTest {
         for (var raftManager : raftManagers.values()) {
             try (var snapshot = RecordsSnapshotReader.of(
                     raftManager.raftLog().latestSnapshot().get(),
-                    new MetadataRecordSerde(),
+                    MetadataRecordSerde.INSTANCE,
                     BufferSupplier.create(),
                     1,
                     true,
diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java 
b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
index df4077060f6..0ac0b74b6bc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
+++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
@@ -731,13 +731,12 @@ public class DumpLogSegments {
     }
 
     private static class ClusterMetadataLogMessageParser implements 
MessageParser<String, String> {
-        private final MetadataRecordSerde metadataRecordSerde = new 
MetadataRecordSerde();
 
         @Override
         public ParseResult<String, String> parse(Record record) {
             String output;
             try {
-                ApiMessageAndVersion messageAndVersion = 
metadataRecordSerde.read(
+                ApiMessageAndVersion messageAndVersion = 
MetadataRecordSerde.INSTANCE.read(
                     new ByteBufferAccessor(record.value()), 
record.valueSize());
                 ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
                 json.set("type", new TextNode(
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java 
b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
index f919518cc9d..77fba98097c 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
@@ -628,12 +628,11 @@ public class DumpLogSegmentsTest {
 
         List<SimpleRecord> records = new ArrayList<>();
         for (ApiMessageAndVersion message : metadataRecords) {
-            MetadataRecordSerde serde = MetadataRecordSerde.INSTANCE;
             ObjectSerializationCache cache = new ObjectSerializationCache();
-            int size = serde.recordSize(message, cache);
+            int size = MetadataRecordSerde.INSTANCE.recordSize(message, cache);
             ByteBuffer buf = ByteBuffer.allocate(size);
             ByteBufferAccessor writer = new ByteBufferAccessor(buf);
-            serde.write(message, cache, writer);
+            MetadataRecordSerde.INSTANCE.write(message, cache, writer);
             buf.flip();
             records.add(new SimpleRecord(null, buf.array()));
         }

Reply via email to