This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1af5faef737 KAFKA-20130 Move RecordValidationStats to storage module 
(#21418)
1af5faef737 is described below

commit 1af5faef737899e86f83aff75efbda1ac8fa9bca
Author: Christo Lolov <[email protected]>
AuthorDate: Thu Feb 12 09:36:50 2026 +0000

    KAFKA-20130 Move RecordValidationStats to storage module (#21418)
    
    Moved to storage and turned into a record for immutability. Removed
    `add()` because it didn't seem to be used.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../record/internal/RecordValidationStats.java     | 75 ----------------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../AbstractCoordinatorConcurrencyTest.scala       |  4 +-
 .../group/CoordinatorPartitionWriterTest.scala     |  4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  2 +-
 .../unit/kafka/server/MockFetcherThread.scala      |  2 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  5 +-
 .../kafka/storage/internals/log/LogAppendInfo.java |  1 -
 .../kafka/storage/internals/log/LogValidator.java  |  1 -
 .../internals/log/RecordValidationStats.java       | 28 ++++++++
 .../kafka/storage/internals/log/UnifiedLog.java    |  1 -
 .../storage/internals/log/LogValidatorTest.java    |  1 -
 13 files changed, 38 insertions(+), 90 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/internal/RecordValidationStats.java
 
b/clients/src/main/java/org/apache/kafka/common/record/internal/RecordValidationStats.java
deleted file mode 100644
index 4f9905cdcc0..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/common/record/internal/RecordValidationStats.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record.internal;
-
-/**
- * This class tracks resource usage during broker record validation for 
eventual reporting in metrics.
- * Record validation covers integrity checks on inbound data (e.g. checksum 
verification), structural
- * validation to make sure that records are well-formed, and conversion 
between record formats if needed.
- */
-public class RecordValidationStats {
-
-    public static final RecordValidationStats EMPTY = new 
RecordValidationStats();
-
-    private long temporaryMemoryBytes;
-    private int numRecordsConverted;
-    private long conversionTimeNanos;
-
-    public RecordValidationStats(long temporaryMemoryBytes, int 
numRecordsConverted, long conversionTimeNanos) {
-        this.temporaryMemoryBytes = temporaryMemoryBytes;
-        this.numRecordsConverted = numRecordsConverted;
-        this.conversionTimeNanos = conversionTimeNanos;
-    }
-
-    public RecordValidationStats() {
-        this(0, 0, 0);
-    }
-
-    public void add(RecordValidationStats stats) {
-        temporaryMemoryBytes += stats.temporaryMemoryBytes;
-        numRecordsConverted += stats.numRecordsConverted;
-        conversionTimeNanos += stats.conversionTimeNanos;
-    }
-
-    /**
-     * Returns the number of temporary memory bytes allocated to process the 
records.
-     * This size depends on whether the records need decompression and/or 
conversion:
-     * <ul>
-     *   <li>Non compressed, no conversion: zero</li>
-     *   <li>Non compressed, with conversion: size of the converted buffer</li>
-     *   <li>Compressed, no conversion: size of the original buffer after 
decompression</li>
-     *   <li>Compressed, with conversion: size of the original buffer after 
decompression + size of the converted buffer uncompressed</li>
-     * </ul>
-     */
-    public long temporaryMemoryBytes() {
-        return temporaryMemoryBytes;
-    }
-
-    public int numRecordsConverted() {
-        return numRecordsConverted;
-    }
-
-    public long conversionTimeNanos() {
-        return conversionTimeNanos;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("RecordValidationStats(temporaryMemoryBytes=%d, 
numRecordsConverted=%d, conversionTimeNanos=%d)",
-                temporaryMemoryBytes, numRecordsConverted, 
conversionTimeNanos);
-    }
-}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6d6f943543f..93b32a50192 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -69,7 +69,7 @@ import 
org.apache.kafka.server.share.{ErroneousAndValidPartitionData, ShareParti
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
-import org.apache.kafka.storage.internals.log.AppendOrigin
+import org.apache.kafka.storage.internals.log.{AppendOrigin, 
RecordValidationStats}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import java.util
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 35818a3575a..6e433ce1526 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -66,7 +66,7 @@ import org.apache.kafka.server.util.timer.{SystemTimer, 
TimerTask}
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
 import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
 import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, 
OffsetCheckpointFile, OffsetCheckpoints}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, 
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, 
OffsetResultHolder, RecordValidationException, RemoteLogReadResult, 
RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, 
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, 
OffsetResultHolder, RecordValidationException, RecordValidationStats, 
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import java.io.File
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 8314c110899..a2bf4ad3537 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -28,7 +28,7 @@ import kafka.server._
 import kafka.utils._
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.internal.{MemoryRecords, RecordBatch, 
RecordValidationStats}
+import org.apache.kafka.common.record.internal.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.metadata.MetadataCache
@@ -37,7 +37,7 @@ import 
org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperation
 import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
 import org.apache.kafka.server.util.timer.{MockTimer, Timer}
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
RecordValidationStats, UnifiedLog, VerificationGuard}
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.mockito.Mockito.{mock, when, withSettings}
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index 75146a7c08b..bb717c8c004 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -22,10 +22,10 @@ import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.internal.{CompressionType, 
ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch, 
RecordValidationStats, SimpleRecord}
+import org.apache.kafka.common.record.internal.{CompressionType, 
ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch, 
SimpleRecord}
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter
 import org.apache.kafka.server.common.TransactionVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo, 
LogConfig, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo, 
LogConfig, RecordValidationStats, VerificationGuard}
 import org.apache.kafka.test.TestUtils.assertFutureThrows
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, 
assertThrows, assertTrue}
 import org.junit.jupiter.api.Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index ad8f3d47916..fad3681091a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -102,7 +102,7 @@ import org.apache.kafka.server.share.context.{FinalContext, 
ShareSessionContext}
 import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
UnifiedLog}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
RecordValidationStats, UnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala 
b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
index 81d3f826863..ebd2b656dfd 100644
--- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
+++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.ReplicaState
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.storage.internals.log.LogAppendInfo
+import org.apache.kafka.storage.internals.log.{LogAppendInfo, 
RecordValidationStats}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index c83c80f0322..b77e36ad184 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -18,7 +18,6 @@ package kafka.server
 
 import kafka.cluster.Partition
 import kafka.log.LogManager
-
 import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
 import kafka.server.epoch.util.MockBlockingSender
 import kafka.utils.TestUtils
@@ -30,7 +29,7 @@ import org.apache.kafka.common.message.FetchResponseData
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.internal.{CompressionType, 
MemoryRecords, RecordBatch, RecordValidationStats, SimpleRecord}
+import org.apache.kafka.common.record.internal.{CompressionType, 
MemoryRecords, RecordBatch, SimpleRecord}
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.utils.{LogContext, Time}
@@ -40,7 +39,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.ReplicaState
 import org.apache.kafka.server.PartitionFetchState
 import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig, 
UnifiedLog}
+import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig, 
RecordValidationStats, UnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
index 8b0295c7a9c..1aab6341cd3 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
@@ -18,7 +18,6 @@ package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.record.internal.CompressionType;
 import org.apache.kafka.common.record.internal.RecordBatch;
-import org.apache.kafka.common.record.internal.RecordValidationStats;
 import org.apache.kafka.common.requests.ProduceResponse.RecordError;
 
 import java.util.List;
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
index 415f2c7cc1e..dc4035b768e 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
@@ -32,7 +32,6 @@ import 
org.apache.kafka.common.record.internal.MemoryRecordsBuilder.RecordsInfo;
 import org.apache.kafka.common.record.internal.MutableRecordBatch;
 import org.apache.kafka.common.record.internal.Record;
 import org.apache.kafka.common.record.internal.RecordBatch;
-import org.apache.kafka.common.record.internal.RecordValidationStats;
 import org.apache.kafka.common.requests.ProduceResponse.RecordError;
 import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.CloseableIterator;
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RecordValidationStats.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RecordValidationStats.java
new file mode 100644
index 00000000000..e3e671b2aeb
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RecordValidationStats.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class tracks resource usage during broker record validation for 
eventual reporting in metrics.
+ * Record validation covers integrity checks on inbound data (e.g. checksum 
verification), structural
+ * validation to make sure that records are well-formed, and conversion 
between record formats if needed.
+ */
+public record RecordValidationStats(long temporaryMemoryBytes, int 
numRecordsConverted, long conversionTimeNanos) {
+
+    public static final RecordValidationStats EMPTY = new 
RecordValidationStats(0, 0, 0);
+
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index 84ba9099647..3bac4c3b9d8 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -38,7 +38,6 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.MutableRecordBatch;
 import org.apache.kafka.common.record.internal.Record;
 import org.apache.kafka.common.record.internal.RecordBatch;
-import org.apache.kafka.common.record.internal.RecordValidationStats;
 import org.apache.kafka.common.record.internal.RecordVersion;
 import org.apache.kafka.common.record.internal.Records;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
index 15cea1537da..77fe1682cfb 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.internal.Record;
 import org.apache.kafka.common.record.internal.RecordBatch;
-import org.apache.kafka.common.record.internal.RecordValidationStats;
 import org.apache.kafka.common.record.internal.RecordVersion;
 import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.utils.PrimitiveRef;

Reply via email to