This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1af5faef737 KAFKA-20130 Move RecordValidationStats to storage module
(#21418)
1af5faef737 is described below
commit 1af5faef737899e86f83aff75efbda1ac8fa9bca
Author: Christo Lolov <[email protected]>
AuthorDate: Thu Feb 12 09:36:50 2026 +0000
KAFKA-20130 Move RecordValidationStats to storage module (#21418)
Moved to storage and turned into a record for immutability. Removed
`add()` because it didn't seem to be used.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../record/internal/RecordValidationStats.java | 75 ----------------------
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../AbstractCoordinatorConcurrencyTest.scala | 4 +-
.../group/CoordinatorPartitionWriterTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
.../unit/kafka/server/MockFetcherThread.scala | 2 +-
.../kafka/server/ReplicaFetcherThreadTest.scala | 5 +-
.../kafka/storage/internals/log/LogAppendInfo.java | 1 -
.../kafka/storage/internals/log/LogValidator.java | 1 -
.../internals/log/RecordValidationStats.java | 28 ++++++++
.../kafka/storage/internals/log/UnifiedLog.java | 1 -
.../storage/internals/log/LogValidatorTest.java | 1 -
13 files changed, 38 insertions(+), 90 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/internal/RecordValidationStats.java
b/clients/src/main/java/org/apache/kafka/common/record/internal/RecordValidationStats.java
deleted file mode 100644
index 4f9905cdcc0..00000000000
---
a/clients/src/main/java/org/apache/kafka/common/record/internal/RecordValidationStats.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record.internal;
-
-/**
- * This class tracks resource usage during broker record validation for
eventual reporting in metrics.
- * Record validation covers integrity checks on inbound data (e.g. checksum
verification), structural
- * validation to make sure that records are well-formed, and conversion
between record formats if needed.
- */
-public class RecordValidationStats {
-
- public static final RecordValidationStats EMPTY = new
RecordValidationStats();
-
- private long temporaryMemoryBytes;
- private int numRecordsConverted;
- private long conversionTimeNanos;
-
- public RecordValidationStats(long temporaryMemoryBytes, int
numRecordsConverted, long conversionTimeNanos) {
- this.temporaryMemoryBytes = temporaryMemoryBytes;
- this.numRecordsConverted = numRecordsConverted;
- this.conversionTimeNanos = conversionTimeNanos;
- }
-
- public RecordValidationStats() {
- this(0, 0, 0);
- }
-
- public void add(RecordValidationStats stats) {
- temporaryMemoryBytes += stats.temporaryMemoryBytes;
- numRecordsConverted += stats.numRecordsConverted;
- conversionTimeNanos += stats.conversionTimeNanos;
- }
-
- /**
- * Returns the number of temporary memory bytes allocated to process the
records.
- * This size depends on whether the records need decompression and/or
conversion:
- * <ul>
- * <li>Non compressed, no conversion: zero</li>
- * <li>Non compressed, with conversion: size of the converted buffer</li>
- * <li>Compressed, no conversion: size of the original buffer after
decompression</li>
- * <li>Compressed, with conversion: size of the original buffer after
decompression + size of the converted buffer uncompressed</li>
- * </ul>
- */
- public long temporaryMemoryBytes() {
- return temporaryMemoryBytes;
- }
-
- public int numRecordsConverted() {
- return numRecordsConverted;
- }
-
- public long conversionTimeNanos() {
- return conversionTimeNanos;
- }
-
- @Override
- public String toString() {
- return String.format("RecordValidationStats(temporaryMemoryBytes=%d,
numRecordsConverted=%d, conversionTimeNanos=%d)",
- temporaryMemoryBytes, numRecordsConverted,
conversionTimeNanos);
- }
-}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6d6f943543f..93b32a50192 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -69,7 +69,7 @@ import
org.apache.kafka.server.share.{ErroneousAndValidPartitionData, ShareParti
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
-import org.apache.kafka.storage.internals.log.AppendOrigin
+import org.apache.kafka.storage.internals.log.{AppendOrigin,
RecordValidationStats}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.util
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 35818a3575a..6e433ce1526 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -66,7 +66,7 @@ import org.apache.kafka.server.util.timer.{SystemTimer,
TimerTask}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints,
OffsetCheckpointFile, OffsetCheckpoints}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig,
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult,
OffsetResultHolder, RecordValidationException, RemoteLogReadResult,
RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig,
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult,
OffsetResultHolder, RecordValidationException, RecordValidationStats,
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.io.File
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 8314c110899..a2bf4ad3537 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -28,7 +28,7 @@ import kafka.server._
import kafka.utils._
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.internal.{MemoryRecords, RecordBatch,
RecordValidationStats}
+import org.apache.kafka.common.record.internal.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.MetadataCache
@@ -37,7 +37,7 @@ import
org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperation
import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
RecordValidationStats, UnifiedLog, VerificationGuard}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.mockito.Mockito.{mock, when, withSettings}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index 75146a7c08b..bb717c8c004 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -22,10 +22,10 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.internal.{CompressionType,
ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch,
RecordValidationStats, SimpleRecord}
+import org.apache.kafka.common.record.internal.{CompressionType,
ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch,
SimpleRecord}
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.server.common.TransactionVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo,
LogConfig, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo,
LogConfig, RecordValidationStats, VerificationGuard}
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull,
assertThrows, assertTrue}
import org.junit.jupiter.api.Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index ad8f3d47916..fad3681091a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -102,7 +102,7 @@ import org.apache.kafka.server.share.context.{FinalContext,
ShareSessionContext}
import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
UnifiedLog}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
RecordValidationStats, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
index 81d3f826863..ebd2b656dfd 100644
--- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
+++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.ReplicaState
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.storage.internals.log.LogAppendInfo
+import org.apache.kafka.storage.internals.log.{LogAppendInfo,
RecordValidationStats}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index c83c80f0322..b77e36ad184 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -18,7 +18,6 @@ package kafka.server
import kafka.cluster.Partition
import kafka.log.LogManager
-
import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import kafka.server.epoch.util.MockBlockingSender
import kafka.utils.TestUtils
@@ -30,7 +29,7 @@ import org.apache.kafka.common.message.FetchResponseData
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.internal.{CompressionType,
MemoryRecords, RecordBatch, RecordValidationStats, SimpleRecord}
+import org.apache.kafka.common.record.internal.{CompressionType,
MemoryRecords, RecordBatch, SimpleRecord}
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.utils.{LogContext, Time}
@@ -40,7 +39,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.ReplicaState
import org.apache.kafka.server.PartitionFetchState
import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig,
UnifiedLog}
+import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig,
RecordValidationStats, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
index 8b0295c7a9c..1aab6341cd3 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
@@ -18,7 +18,6 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.record.internal.CompressionType;
import org.apache.kafka.common.record.internal.RecordBatch;
-import org.apache.kafka.common.record.internal.RecordValidationStats;
import org.apache.kafka.common.requests.ProduceResponse.RecordError;
import java.util.List;
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
index 415f2c7cc1e..dc4035b768e 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
@@ -32,7 +32,6 @@ import
org.apache.kafka.common.record.internal.MemoryRecordsBuilder.RecordsInfo;
import org.apache.kafka.common.record.internal.MutableRecordBatch;
import org.apache.kafka.common.record.internal.Record;
import org.apache.kafka.common.record.internal.RecordBatch;
-import org.apache.kafka.common.record.internal.RecordValidationStats;
import org.apache.kafka.common.requests.ProduceResponse.RecordError;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RecordValidationStats.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RecordValidationStats.java
new file mode 100644
index 00000000000..e3e671b2aeb
--- /dev/null
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RecordValidationStats.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class tracks resource usage during broker record validation for
eventual reporting in metrics.
+ * Record validation covers integrity checks on inbound data (e.g. checksum
verification), structural
+ * validation to make sure that records are well-formed, and conversion
between record formats if needed.
+ */
+public record RecordValidationStats(long temporaryMemoryBytes, int
numRecordsConverted, long conversionTimeNanos) {
+
+ public static final RecordValidationStats EMPTY = new
RecordValidationStats(0, 0, 0);
+
+}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index 84ba9099647..3bac4c3b9d8 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -38,7 +38,6 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.MutableRecordBatch;
import org.apache.kafka.common.record.internal.Record;
import org.apache.kafka.common.record.internal.RecordBatch;
-import org.apache.kafka.common.record.internal.RecordValidationStats;
import org.apache.kafka.common.record.internal.RecordVersion;
import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.common.requests.ListOffsetsRequest;
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
index 15cea1537da..77fe1682cfb 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
import org.apache.kafka.common.record.internal.Record;
import org.apache.kafka.common.record.internal.RecordBatch;
-import org.apache.kafka.common.record.internal.RecordValidationStats;
import org.apache.kafka.common.record.internal.RecordVersion;
import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.utils.PrimitiveRef;