This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 64f7a0a300c70b00c0cc357d0a936ea8b42b69fb Author: Manikumar Reddy <manikumar.re...@gmail.com> AuthorDate: Thu Mar 28 11:08:50 2024 +0530 Revert "KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records (#15542)" This reverts commit 8aa39869aae388abf9e3d73e364cb17e2dd6a8f8. --- .../kafka/common/record/MemoryRecordsBuilder.java | 29 ++- .../common/record/MemoryRecordsBuilderTest.java | 22 +-- .../kafka/common/record/MemoryRecordsTest.java | 5 +- .../kafka/admin/ListOffsetsIntegrationTest.scala | 218 +++------------------ .../kafka/server/QuorumTestHarness.scala | 2 +- .../scala/unit/kafka/log/LogValidatorTest.scala | 27 ++- .../kafka/storage/internals/log/LogValidator.java | 13 +- 7 files changed, 81 insertions(+), 235 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index de03030d82e..3e9360f04ca 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -242,23 +242,34 @@ public class MemoryRecordsBuilder implements AutoCloseable { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. * - * If the log append time is used, the offset will be the first offset of the record. + * If the log append time is used, the offset will be the last offset unless no compression is used and + * the message format version is 0 or 1, in which case, it will be the first offset. * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. + * If create time is used, the offset will be the last offset unless no compression is used and the message + * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. * * @return The max timestamp and its offset */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { - return new RecordsInfo(logAppendTime, baseOffset); + long shallowOffsetOfMaxTimestamp; + // Use the last offset when dealing with record batches + if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) + shallowOffsetOfMaxTimestamp = lastOffset; + else + shallowOffsetOfMaxTimestamp = baseOffset; + return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp); + } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { + return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { - // For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping - // If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] - return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); + long shallowOffsetOfMaxTimestamp; + // Use the last offset when dealing with record batches + if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) + shallowOffsetOfMaxTimestamp = lastOffset; + else + shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; + return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 0923baea01e..4f3f03c3f2d 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -378,8 +378,10 @@ public class MemoryRecordsBuilderTest { MemoryRecordsBuilder.RecordsInfo info = builder.info(); assertEquals(logAppendTime, info.maxTimestamp); - // When logAppendTime is used, the first offset of the batch will be the offset of maxTimestamp - assertEquals(0L, info.shallowOffsetOfMaxTimestamp); + if (args.compressionType == CompressionType.NONE && magic <= MAGIC_VALUE_V1) + assertEquals(0L, info.shallowOffsetOfMaxTimestamp); + else + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); for (RecordBatch batch : records.batches()) { if (magic == MAGIC_VALUE_V0) { @@ -413,11 +415,10 @@ public class MemoryRecordsBuilderTest { assertEquals(2L, info.maxTimestamp); } - if (magic == MAGIC_VALUE_V0) - // in MAGIC_VALUE_V0's case, we don't have timestamp info in records, so always return -1. - assertEquals(-1L, info.shallowOffsetOfMaxTimestamp); - else + if (args.compressionType == CompressionType.NONE && magic == MAGIC_VALUE_V1) assertEquals(1L, info.shallowOffsetOfMaxTimestamp); + else + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); int i = 0; long[] expectedTimestamps = new long[] {0L, 2L, 1L}; @@ -494,13 +495,12 @@ public class MemoryRecordsBuilderTest { MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); - if (magic == MAGIC_VALUE_V0) { + if (magic == MAGIC_VALUE_V0) assertEquals(-1, info.maxTimestamp); - assertEquals(-1L, info.shallowOffsetOfMaxTimestamp); - } else { + else assertEquals(2L, info.maxTimestamp); - assertEquals(2L, info.shallowOffsetOfMaxTimestamp); - } + + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); long i = 0L; for (RecordBatch batch : records.batches()) { diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 50821af841c..3f0195bf5d1 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -893,7 +893,10 @@ public class MemoryRecordsTest { assertEquals(filtered.limit(), result.bytesRetained()); if (magic > RecordBatch.MAGIC_VALUE_V0) { assertEquals(20L, result.maxTimestamp()); - assertEquals(4L, result.shallowOffsetOfMaxTimestamp()); + if (compression == CompressionType.NONE && magic < RecordBatch.MAGIC_VALUE_V2) + assertEquals(4L, result.shallowOffsetOfMaxTimestamp()); + else + assertEquals(5L, result.shallowOffsetOfMaxTimestamp()); } MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index ad1718c6a43..bcb9641e9e8 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -19,235 +19,77 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} -import kafka.utils.{TestInfoUtils, TestUtils} +import kafka.utils.TestUtils import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.config.TopicConfig -import org.apache.kafka.common.utils.{MockTime, Time, Utils} +import org.apache.kafka.common.utils.Utils import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} -import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" - val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ - var setOldMessageFormat: Boolean = false - val mockTime: Time = new MockTime(1) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - createTopicWithConfig(topicName, new Properties()) + createTopic(topicName, 1, 1.toShort) + produceMessages() adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } - override def brokerTime(brokerId: Int): Time = mockTime - @AfterEach override def tearDown(): Unit = { - setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft")) - def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { - produceMessagesInOneBatch("gzip") - verifyListOffsets() - - // test LogAppendTime case - val props: Properties = new Properties() - props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") - createTopicWithConfig(topicNameWithCustomConfigs, props) - produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) - // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. - // So in this one batch test, it'll be the first offset 0 - verifyListOffsets(topic = topicNameWithCustomConfigs, 0) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft")) - def testThreeRecordsInSeparateBatch(quorum: String): Unit = { - produceMessagesInSeparateBatch() - verifyListOffsets() - } - - // The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0 - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk")) - def testThreeRecordsInOneBatchWithMessageConversion(quorum: String): Unit = { - createOldMessageFormatBrokers() - produceMessagesInOneBatch() - verifyListOffsets() - - // test LogAppendTime case - val props: Properties = new Properties() - props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") - createTopicWithConfig(topicNameWithCustomConfigs, props) - produceMessagesInOneBatch(topic = topicNameWithCustomConfigs) - // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. - // So in this one batch test, it'll be the first offset 0 - verifyListOffsets(topic = topicNameWithCustomConfigs, 0) - } - - // The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0 - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk")) - def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): Unit = { - createOldMessageFormatBrokers() - produceMessagesInSeparateBatch() - verifyListOffsets() - - // test LogAppendTime case - val props: Properties = new Properties() - props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") - createTopicWithConfig(topicNameWithCustomConfigs, props) - produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs) - // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. - // So in this separate batch test, it'll be the last offset 2 - verifyListOffsets(topic = topicNameWithCustomConfigs, 2) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft")) - def testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer(quorum: String): Unit = { - val props: Properties = new Properties() - props.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4") - createTopicWithConfig(topicNameWithCustomConfigs, props) - produceMessagesInOneBatch(topic = topicNameWithCustomConfigs) - verifyListOffsets(topic = topicNameWithCustomConfigs) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft")) - def testThreeRecordsInSeparateBatchHavingDifferentCompressionTypeWithServer(quorum: String): Unit = { - val props: Properties = new Properties() - props.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4") - createTopicWithConfig(topicNameWithCustomConfigs, props) - produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs) - verifyListOffsets(topic = topicNameWithCustomConfigs) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft")) - def testThreeCompressedRecordsInSeparateBatch(quorum: String): Unit = { - produceMessagesInSeparateBatch("gzip") - verifyListOffsets() - - // test LogAppendTime case - val props: Properties = new Properties() - props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") - createTopicWithConfig(topicNameWithCustomConfigs, props) - produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs) - // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. - // So in this separate batch test, it'll be the last offset 2 - verifyListOffsets(topic = topicNameWithCustomConfigs, 2) - } - - private def createOldMessageFormatBrokers(): Unit = { - setOldMessageFormat = true - recreateBrokers(reconfigure = true, startup = true) - Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") - adminClient = Admin.create(Map[String, Object]( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() - ).asJava) - } - - private def createTopicWithConfig(topic: String, props: Properties): Unit = { - createTopic(topic, 1, 1.toShort, topicConfig = props) - } - - private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = { - val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) + @Test + def testEarliestOffset(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) assertEquals(0, earliestOffset.offset()) + } - val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) + @Test + def testLatestOffset(): Unit = { + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) assertEquals(3, latestOffset.offset()) + } - val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) - assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) + @Test + def testMaxTimestampOffset(): Unit = { + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp()) + assertEquals(1, maxTimestampOffset.offset()) } private def runFetchOffsets(adminClient: Admin, - offsetSpec: OffsetSpec, - topic: String): ListOffsetsResult.ListOffsetsResultInfo = { - val tp = new TopicPartition(topic, 0) + offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = { + val tp = new TopicPartition(topicName, 0) adminClient.listOffsets(Map( tp -> offsetSpec ).asJava, new ListOffsetsOptions()).all().get().get(tp) } - private def produceMessagesInOneBatch(compressionType: String = "none", topic: String = topicName): Unit = { + def produceMessages(): Unit = { val records = Seq( - new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 100L, - null, new Array[Byte](10)), - new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 999L, - null, new Array[Byte](10)), - new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 200L, - null, new Array[Byte](10)), + new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L, + null, new Array[Byte](10000)), + new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L, + null, new Array[Byte](10000)), + new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L, + null, new Array[Byte](10000)), ) - // create a producer with large linger.ms and enough batch.size (default is enough for three 10 bytes records), - // so that we can confirm all records will be accumulated in producer until we flush them into one batch. - val producer = createProducer( - plaintextBootstrapServers(brokers), - deliveryTimeoutMs = Int.MaxValue, - lingerMs = Int.MaxValue, - compressionType = compressionType) - - try { - val futures = records.map(producer.send) - producer.flush() - futures.foreach(_.get) - } finally { - producer.close() - } + TestUtils.produceMessages(servers, records, -1) } - private def produceMessagesInSeparateBatch(compressionType: String = "none", topic: String = topicName): Unit = { - val records = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 100L, - null, new Array[Byte](10))) - val records2 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 999L, - null, new Array[Byte](10))) - val records3 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 200L, - null, new Array[Byte](10))) - - val producer = createProducer( - plaintextBootstrapServers(brokers), - compressionType = compressionType) - try { - val futures = records.map(producer.send) - futures.foreach(_.get) - // advance the server time after each record sent to make sure the time changed when appendTime is used - mockTime.sleep(100) - val futures2 = records2.map(producer.send) - futures2.foreach(_.get) - mockTime.sleep(100) - val futures3 = records3.map(producer.send) - futures3.foreach(_.get) - } finally { - producer.close() - } - } + def generateConfigs: Seq[KafkaConfig] = + TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps) +} - def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(1, zkConnectOrNull).map{ props => - if (setOldMessageFormat) { - props.setProperty("log.message.format.version", "0.10.0") - props.setProperty("inter.broker.protocol.version", "0.10.0") - } - props - }.map(KafkaConfig.fromProps) - } -} \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index f2c3a717b7f..0a2cf270bf9 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -98,7 +98,7 @@ class KRaftQuorumImplementation( ): KafkaBroker = { val sharedServer = new SharedServer(config, new MetaProperties(clusterId, config.nodeId), - time, + Time.SYSTEM, new Metrics(), controllerQuorumVotersFuture, faultHandlerFactory) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 5ffb037aff8..6b781f6fa69 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -219,8 +219,8 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, - s"The offset of max timestamp should be 0 if logAppendTime is used") + assertEquals(records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs, + s"The offset of max timestamp should be ${records.records.asScala.size - 1}") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size may have been changed") @@ -271,8 +271,8 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, - s"The offset of max timestamp should be 0 if logAppendTime is used") + assertEquals(records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs, + s"The offset of max timestamp should be ${records.records.asScala.size - 1}") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -341,7 +341,6 @@ class LogValidatorTest { private def checkNonCompressed(magic: Byte): Unit = { val now = System.currentTimeMillis() - // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp val timestampSeq = Seq(now - 1, now + 1, now) val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = @@ -421,7 +420,6 @@ class LogValidatorTest { private def checkRecompression(magic: Byte): Unit = { val now = System.currentTimeMillis() - // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp val timestampSeq = Seq(now - 1, now + 1, now) val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = @@ -475,8 +473,8 @@ class LogValidatorTest { } assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs, - "Offset of max timestamp should be 1") + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestampMs, + "Offset of max timestamp should be 2") assertTrue(validatingResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -527,8 +525,8 @@ class LogValidatorTest { } assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP, s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}") - assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs, - s"Offset of max timestamp should be -1") + assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs, + s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, @@ -574,8 +572,8 @@ class LogValidatorTest { assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence) } assertEquals(timestamp, validatedResults.maxTimestampMs) - assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, - s"Offset of max timestamp should be 0 when multiple records having the same max timestamp.") + assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs, + s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, @@ -589,7 +587,6 @@ class LogValidatorTest { private def checkCompressed(magic: Byte): Unit = { val now = System.currentTimeMillis() - // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp val timestampSeq = Seq(now - 1, now + 1, now) val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = @@ -642,8 +639,8 @@ class LogValidatorTest { } } assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(1, validatedResults.shallowOffsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") + assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs, + s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index c8fd50e468d..a05512a0196 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -329,8 +329,6 @@ public class LogValidator { long maxTimestamp = RecordBatch.NO_TIMESTAMP; LongRef expectedInnerOffset = PrimitiveRef.ofLong(0); List<Record> validatedRecords = new ArrayList<>(); - long offsetOfMaxTimestamp = -1; - long initialOffset = offsetCounter.value; int uncompressedSizeInBytes = 0; @@ -380,11 +378,8 @@ public class LogValidator { && batch.magic() > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { - if (record.timestamp() > maxTimestamp) { + if (record.timestamp() > maxTimestamp) maxTimestamp = record.timestamp(); - // The offset is only increased when it is a valid record - offsetOfMaxTimestamp = initialOffset + validatedRecords.size(); - } // Some older clients do not implement the V1 internal offsets correctly. // Historically the broker handled this by rewriting the batches rather @@ -421,10 +416,8 @@ public class LogValidator { long lastOffset = offsetCounter.value - 1; firstBatch.setLastOffset(lastOffset); - if (timestampType == TimestampType.LOG_APPEND_TIME) { + if (timestampType == TimestampType.LOG_APPEND_TIME) maxTimestamp = now; - offsetOfMaxTimestamp = initialOffset; - } if (toMagic >= RecordBatch.MAGIC_VALUE_V1) firstBatch.setMaxTimestamp(timestampType, maxTimestamp); @@ -437,7 +430,7 @@ public class LogValidator { now, records, maxTimestamp, - offsetOfMaxTimestamp, + lastOffset, false, recordConversionStats); }