This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 4dd893ba212 KAFKA-806 Index may not always observe 
log.index.interval.bytes (#18842)
4dd893ba212 is described below

commit 4dd893ba212d05fb2201d884d962f0c0f81ebb10
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Mar 20 16:38:54 2025 +0800

    KAFKA-806 Index may not always observe log.index.interval.bytes (#18842)
    
    Currently, each log.append() will add at most 1 index entry, even when
    the appended data is larger than log.index.interval.bytes. One potential
    issue is that if a follower restarts after being down for a long time,
    it may fetch data much bigger than log.index.interval.bytes at a time.
    This means that fewer index entries are created, which can increase the
    fetch time from the consumers.
    
    (cherry picked from commit e124d3975bdb3a9ec85eee2fba7a1b0a6967d3a6)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |   2 +-
 core/src/main/scala/kafka/log/UnifiedLog.scala     |   5 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |   4 +-
 .../unit/kafka/server/MockFetcherThread.scala      |   1 -
 .../kafka/server/ReplicaFetcherThreadTest.scala    |   1 -
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     |   4 +-
 .../kafka/storage/internals/log/LocalLog.java      |   4 +-
 .../kafka/storage/internals/log/LogAppendInfo.java |  75 ++++------
 .../kafka/storage/internals/log/LogSegment.java    |  46 +++---
 .../kafka/storage/internals/log/LogValidator.java  |  35 +----
 .../kafka/storage/internals/log/LocalLogTest.java  |   2 -
 .../storage/internals/log/LogSegmentTest.java      | 155 ++++++++++++++++-----
 .../storage/internals/log/LogValidatorTest.java    |  24 ----
 13 files changed, 181 insertions(+), 177 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index a4f96ff7e63..5d7ee518963 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -824,7 +824,7 @@ private[log] class Cleaner(val id: Int,
         val retained = MemoryRecords.readableRecords(outputBuffer)
         // it's OK not to hold the Log's lock in this case, because this 
segment is only accessed by other threads
         // after `Log.replaceSegments` (which acquires the lock) is called
-        dest.append(result.maxOffset, result.maxTimestamp, 
result.shallowOffsetOfMaxTimestamp(), retained)
+        dest.append(result.maxOffset, retained)
         throttler.maybeThrottle(outputBuffer.limit())
       }
 
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 9a977a262b6..3c1a1570871 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -816,7 +816,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
             validRecords = validateAndOffsetAssignResult.validatedRecords
             
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
-            
appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp)
             appendInfo.setLastOffset(offset.value - 1)
             
appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
             if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
@@ -902,7 +901,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
               // will be cleaned up after the log directory is recovered. Note 
that the end offset of the
               // ProducerStateManager will not be updated and the last stable 
offset will not advance
               // if the append to the transaction index fails.
-              localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, 
appendInfo.shallowOffsetOfMaxTimestamp, validRecords)
+              localLog.append(appendInfo.lastOffset, validRecords)
               updateHighWatermarkWithLogEndOffset()
 
               // update the producer state
@@ -1183,7 +1182,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     else
       OptionalInt.empty()
 
-    new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, 
maxTimestamp, shallowOffsetOfMaxTimestamp,
+    new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, 
maxTimestamp,
       RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, 
sourceCompression,
       validBytesCount, lastOffsetOfFirstBatch, 
Collections.emptyList[RecordError], LeaderHwChange.NONE)
   }
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index bbcda01451f..9d208055a58 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -4455,8 +4455,8 @@ class UnifiedLogTest {
     segments.add(seg2)
     assertEquals(Seq(Long.MaxValue, Long.MaxValue), 
log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
 
-    seg1.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, 
new SimpleRecord("one".getBytes)))
-    seg2.append(2, 2000L, 1, MemoryRecords.withRecords(2, Compression.NONE, 
new SimpleRecord("two".getBytes)))
+    seg1.append(1, MemoryRecords.withRecords(1, Compression.NONE, new 
SimpleRecord(1000L, "one".getBytes)))
+    seg2.append(2, MemoryRecords.withRecords(2, Compression.NONE, new 
SimpleRecord(2000L, "two".getBytes)))
     assertEquals(Seq(1000L, 2000L), 
log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
 
     seg1.close()
diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala 
b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
index 2754685b8f4..5d50de04095 100644
--- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
+++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
@@ -107,7 +107,6 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
       lastOffset,
       lastEpoch,
       maxTimestamp,
-      shallowOffsetOfMaxTimestamp,
       Time.SYSTEM.milliseconds(),
       state.logStartOffset,
       RecordValidationStats.EMPTY,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index ff556f586c4..6526d6628c3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -465,7 +465,6 @@ class ReplicaFetcherThreadTest {
       0,
       OptionalInt.empty,
       RecordBatch.NO_TIMESTAMP,
-      -1L,
       RecordBatch.NO_TIMESTAMP,
       -1L,
       RecordValidationStats.EMPTY,
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index b86a5608c3d..bf8cafac629 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.metadata.{PartitionChangeRecord, 
RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.protocol.{ByteBufferAccessor, 
ObjectSerializationCache}
-import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, MemoryRecords, Record, RecordBatch, RecordVersion, 
SimpleRecord}
+import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, MemoryRecords, Record, RecordVersion, SimpleRecord}
 import org.apache.kafka.common.utils.{Exit, Utils}
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
 import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
@@ -402,7 +402,7 @@ class DumpLogSegmentsTest {
     log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats, 
time.scheduler, time)
     log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 
metadataRecords:_*), leaderEpoch = 0)
     val secondSegment = log.roll()
-    secondSegment.append(1L, RecordBatch.NO_TIMESTAMP, 1L, 
MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*))
+    secondSegment.append(1L, MemoryRecords.withRecords(Compression.NONE, 
metadataRecords: _*))
     secondSegment.flush()
     log.flush(true)
     
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
index 027196a5de7..817da5c8318 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
@@ -525,8 +525,8 @@ public class LocalLog {
         );
     }
 
-    public void append(long lastOffset, long largestTimestamp, long 
shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException {
-        segments.activeSegment().append(lastOffset, largestTimestamp, 
shallowOffsetOfMaxTimestamp, records);
+    public void append(long lastOffset, MemoryRecords records) throws 
IOException {
+        segments.activeSegment().append(lastOffset, records);
         updateLogEndOffset(lastOffset + 1);
     }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
index 05e162a3042..63a8a510818 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
@@ -31,13 +31,12 @@ import java.util.OptionalInt;
 public class LogAppendInfo {
 
     public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new 
LogAppendInfo(-1, -1, OptionalInt.empty(),
-            RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
+            RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, -1L,
             RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
 
     private long firstOffset;
     private long lastOffset;
     private long maxTimestamp;
-    private long shallowOffsetOfMaxTimestamp;
     private long logAppendTime;
     private long logStartOffset;
     private RecordValidationStats recordValidationStats;
@@ -52,31 +51,29 @@ public class LogAppendInfo {
     /**
      * Creates an instance with the given params.
      *
-     * @param firstOffset                   The first offset in the message 
set unless the message format is less than V2 and we are appending
-     *                                      to the follower.
-     * @param lastOffset                    The last offset in the message set
-     * @param lastLeaderEpoch               The partition leader epoch 
corresponding to the last offset, if available.
-     * @param maxTimestamp                  The maximum timestamp of the 
message set.
-     * @param shallowOffsetOfMaxTimestamp   The last offset of the batch with 
the maximum timestamp.
-     * @param logAppendTime                 The log append time (if used) of 
the message set, otherwise Message.NoTimestamp
-     * @param logStartOffset                The start offset of the log at the 
time of this append.
-     * @param recordValidationStats         Statistics collected during record 
processing, `null` if `assignOffsets` is `false`
-     * @param sourceCompression             The source codec used in the 
message set (send by the producer)
-     * @param validBytes                    The number of valid bytes
-     * @param lastOffsetOfFirstBatch        The last offset of the first batch
+     * @param firstOffset            The first offset in the message set 
unless the message format is less than V2 and we are appending
+     *                               to the follower.
+     * @param lastOffset             The last offset in the message set
+     * @param lastLeaderEpoch        The partition leader epoch corresponding 
to the last offset, if available.
+     * @param maxTimestamp           The maximum timestamp of the message set.
+     * @param logAppendTime          The log append time (if used) of the 
message set, otherwise Message.NoTimestamp
+     * @param logStartOffset         The start offset of the log at the time 
of this append.
+     * @param recordValidationStats  Statistics collected during record 
processing, `null` if `assignOffsets` is `false`
+     * @param sourceCompression      The source codec used in the message set 
(send by the producer)
+     * @param validBytes             The number of valid bytes
+     * @param lastOffsetOfFirstBatch The last offset of the first batch
      */
     public LogAppendInfo(long firstOffset,
                          long lastOffset,
                          OptionalInt lastLeaderEpoch,
                          long maxTimestamp,
-                         long shallowOffsetOfMaxTimestamp,
                          long logAppendTime,
                          long logStartOffset,
                          RecordValidationStats recordValidationStats,
                          CompressionType sourceCompression,
                          int validBytes,
                          long lastOffsetOfFirstBatch) {
-        this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, 
shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset,
+        this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, 
logAppendTime, logStartOffset,
             recordValidationStats, sourceCompression, validBytes, 
lastOffsetOfFirstBatch, Collections.emptyList(),
                 LeaderHwChange.NONE);
     }
@@ -84,27 +81,25 @@ public class LogAppendInfo {
     /**
      * Creates an instance with the given params.
      *
-     * @param firstOffset                   The first offset in the message 
set unless the message format is less than V2 and we are appending
-     *                                      to the follower.
-     * @param lastOffset                    The last offset in the message set
-     * @param lastLeaderEpoch               The partition leader epoch 
corresponding to the last offset, if available.
-     * @param maxTimestamp                  The maximum timestamp of the 
message set.
-     * @param shallowOffsetOfMaxTimestamp   The last offset of the batch with 
the maximum timestamp.
-     * @param logAppendTime                 The log append time (if used) of 
the message set, otherwise Message.NoTimestamp
-     * @param logStartOffset                The start offset of the log at the 
time of this append.
-     * @param recordValidationStats         Statistics collected during record 
processing, `null` if `assignOffsets` is `false`
-     * @param sourceCompression             The source codec used in the 
message set (send by the producer)
-     * @param validBytes                    The number of valid bytes
-     * @param lastOffsetOfFirstBatch        The last offset of the first batch
-     * @param recordErrors                  List of record errors that caused 
the respective batch to be dropped
-     * @param leaderHwChange                Incremental if the high watermark 
needs to be increased after appending record
-     *                                      Same if high watermark is not 
changed. None is the default value and it means append failed
+     * @param firstOffset            The first offset in the message set 
unless the message format is less than V2 and we are appending
+     *                               to the follower.
+     * @param lastOffset             The last offset in the message set
+     * @param lastLeaderEpoch        The partition leader epoch corresponding 
to the last offset, if available.
+     * @param maxTimestamp           The maximum timestamp of the message set.
+     * @param logAppendTime          The log append time (if used) of the 
message set, otherwise Message.NoTimestamp
+     * @param logStartOffset         The start offset of the log at the time 
of this append.
+     * @param recordValidationStats  Statistics collected during record 
processing, `null` if `assignOffsets` is `false`
+     * @param sourceCompression      The source codec used in the message set 
(send by the producer)
+     * @param validBytes             The number of valid bytes
+     * @param lastOffsetOfFirstBatch The last offset of the first batch
+     * @param recordErrors           List of record errors that caused the 
respective batch to be dropped
+     * @param leaderHwChange         Incremental if the high watermark needs 
to be increased after appending record
+     *                               Same if high watermark is not changed. 
None is the default value and it means append failed
      */
     public LogAppendInfo(long firstOffset,
                          long lastOffset,
                          OptionalInt lastLeaderEpoch,
                          long maxTimestamp,
-                         long shallowOffsetOfMaxTimestamp,
                          long logAppendTime,
                          long logStartOffset,
                          RecordValidationStats recordValidationStats,
@@ -117,7 +112,6 @@ public class LogAppendInfo {
         this.lastOffset = lastOffset;
         this.lastLeaderEpoch = lastLeaderEpoch;
         this.maxTimestamp = maxTimestamp;
-        this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
         this.logAppendTime = logAppendTime;
         this.logStartOffset = logStartOffset;
         this.recordValidationStats = recordValidationStats;
@@ -156,14 +150,6 @@ public class LogAppendInfo {
         this.maxTimestamp = maxTimestamp;
     }
 
-    public long shallowOffsetOfMaxTimestamp() {
-        return shallowOffsetOfMaxTimestamp;
-    }
-
-    public void setShallowOffsetOfMaxTimestamp(long 
shallowOffsetOfMaxTimestamp) {
-        this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
-    }
-
     public long logAppendTime() {
         return logAppendTime;
     }
@@ -233,12 +219,12 @@ public class LogAppendInfo {
      * @return a new instance with the given LeaderHwChange
      */
     public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) {
-        return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, 
maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, 
recordValidationStats,
+        return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, 
maxTimestamp, logAppendTime, logStartOffset, recordValidationStats,
                 sourceCompression, validBytes, lastOffsetOfFirstBatch, 
recordErrors, newLeaderHwChange);
     }
 
     public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long 
logStartOffset) {
-        return new LogAppendInfo(-1, -1, OptionalInt.empty(), 
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+        return new LogAppendInfo(-1, -1, OptionalInt.empty(), 
RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset,
                 RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
     }
 
@@ -248,7 +234,7 @@ public class LogAppendInfo {
      * in unknownLogAppendInfoWithLogStartOffset, but with additional fields 
recordErrors
      */
     public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long 
logStartOffset, List<RecordError> recordErrors) {
-        return new LogAppendInfo(-1, -1, OptionalInt.empty(), 
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+        return new LogAppendInfo(-1, -1, OptionalInt.empty(), 
RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset,
                 RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L, 
recordErrors, LeaderHwChange.NONE);
     }
 
@@ -259,7 +245,6 @@ public class LogAppendInfo {
                 ", lastOffset=" + lastOffset +
                 ", lastLeaderEpoch=" + lastLeaderEpoch +
                 ", maxTimestamp=" + maxTimestamp +
-                ", shallowOffsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp 
+
                 ", logAppendTime=" + logAppendTime +
                 ", logStartOffset=" + logStartOffset +
                 ", recordConversionStats=" + recordValidationStats +
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 5148408c0e5..b388af1f798 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -232,38 +232,38 @@ public class LogSegment implements Closeable {
      * It is assumed this method is being called from within a lock, it is not 
thread-safe otherwise.
      *
      * @param largestOffset The last offset in the message set
-     * @param largestTimestampMs The largest timestamp in the message set.
-     * @param shallowOffsetOfMaxTimestamp The last offset of earliest batch 
with max timestamp in the messages to append.
-     * @param records The log entries to append.
+     * @param records       The log entries to append.
      * @throws LogSegmentOffsetOverflowException if the largest offset causes 
index offset overflow
      */
     public void append(long largestOffset,
-                       long largestTimestampMs,
-                       long shallowOffsetOfMaxTimestamp,
                        MemoryRecords records) throws IOException {
         if (records.sizeInBytes() > 0) {
-            LOGGER.trace("Inserting {} bytes at end offset {} at position {} 
with largest timestamp {} at offset {}",
-                records.sizeInBytes(), largestOffset, log.sizeInBytes(), 
largestTimestampMs, shallowOffsetOfMaxTimestamp);
+            LOGGER.trace("Inserting {} bytes at end offset {} at position {}",
+                records.sizeInBytes(), largestOffset, log.sizeInBytes());
             int physicalPosition = log.sizeInBytes();
-            if (physicalPosition == 0)
-                rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);
 
             ensureOffsetInRange(largestOffset);
 
             // append the messages
             long appendedBytes = log.append(records);
             LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, 
log.file(), largestOffset);
-            // Update the in memory max timestamp and corresponding offset.
-            if (largestTimestampMs > maxTimestampSoFar()) {
-                maxTimestampAndOffsetSoFar = new 
TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
-            }
-            // append an entry to the index (if needed)
-            if (bytesSinceLastIndexEntry > indexIntervalBytes) {
-                offsetIndex().append(largestOffset, physicalPosition);
-                timeIndex().maybeAppend(maxTimestampSoFar(), 
shallowOffsetOfMaxTimestampSoFar());
-                bytesSinceLastIndexEntry = 0;
+
+            for (RecordBatch batch : records.batches()) {
+                long batchMaxTimestamp = batch.maxTimestamp();
+                long batchLastOffset = batch.lastOffset();
+                if (batchMaxTimestamp > maxTimestampSoFar()) {
+                    maxTimestampAndOffsetSoFar = new 
TimestampOffset(batchMaxTimestamp, batchLastOffset);
+                }
+
+                if (bytesSinceLastIndexEntry > indexIntervalBytes) {
+                    offsetIndex().append(batchLastOffset, physicalPosition);
+                    timeIndex().maybeAppend(maxTimestampSoFar(), 
shallowOffsetOfMaxTimestampSoFar());
+                    bytesSinceLastIndexEntry = 0;
+                }
+                var sizeInBytes = batch.sizeInBytes();
+                physicalPosition += sizeInBytes;
+                bytesSinceLastIndexEntry += sizeInBytes;
             }
-            bytesSinceLastIndexEntry += records.sizeInBytes();
         }
     }
 
@@ -274,8 +274,6 @@ public class LogSegment implements Closeable {
 
     private int appendChunkFromFile(FileRecords records, int position, 
BufferSupplier bufferSupplier) throws IOException {
         int bytesToAppend = 0;
-        long maxTimestamp = Long.MIN_VALUE;
-        long shallowOffsetOfMaxTimestamp = Long.MIN_VALUE;
         long maxOffset = Long.MIN_VALUE;
         ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024);
 
@@ -284,10 +282,6 @@ public class LogSegment implements Closeable {
         Iterator<FileChannelRecordBatch> nextBatches = 
records.batchesFrom(position).iterator();
         FileChannelRecordBatch batch;
         while ((batch = nextAppendableBatch(nextBatches, readBuffer, 
bytesToAppend)) != null) {
-            if (batch.maxTimestamp() > maxTimestamp) {
-                maxTimestamp = batch.maxTimestamp();
-                shallowOffsetOfMaxTimestamp = batch.lastOffset();
-            }
             maxOffset = batch.lastOffset();
             bytesToAppend += batch.sizeInBytes();
         }
@@ -300,7 +294,7 @@ public class LogSegment implements Closeable {
             readBuffer.limit(bytesToAppend);
             records.readInto(readBuffer, position);
 
-            append(maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp, 
MemoryRecords.readableRecords(readBuffer));
+            append(maxOffset, MemoryRecords.readableRecords(readBuffer));
         }
 
         bufferSupplier.release(readBuffer);
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
index fc2ba45d101..d74ac0a8b3a 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
@@ -65,20 +65,15 @@ public class LogValidator {
         public final long logAppendTimeMs;
         public final MemoryRecords validatedRecords;
         public final long maxTimestampMs;
-        // we only maintain batch level offset for max timestamp since we want 
to align the behavior of updating time
-        // indexing entries. The paths of follower append and replica recovery 
do not iterate all records, so they have no
-        // idea about record level offset for max timestamp.
-        public final long shallowOffsetOfMaxTimestamp;
         public final boolean messageSizeMaybeChanged;
         public final RecordValidationStats recordValidationStats;
 
         public ValidationResult(long logAppendTimeMs, MemoryRecords 
validatedRecords, long maxTimestampMs,
-                                long shallowOffsetOfMaxTimestamp, boolean 
messageSizeMaybeChanged,
+                                boolean messageSizeMaybeChanged,
                                 RecordValidationStats recordValidationStats) {
             this.logAppendTimeMs = logAppendTimeMs;
             this.validatedRecords = validatedRecords;
             this.maxTimestampMs = maxTimestampMs;
-            this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
             this.messageSizeMaybeChanged = messageSizeMaybeChanged;
             this.recordValidationStats = recordValidationStats;
         }
@@ -229,7 +224,6 @@ public class LogValidator {
             now,
             convertedRecords,
             info.maxTimestamp,
-            info.shallowOffsetOfMaxTimestamp,
             true,
             recordValidationStats);
     }
@@ -239,8 +233,6 @@ public class LogValidator {
                                                        MetricsRecorder 
metricsRecorder) {
         long now = time.milliseconds();
         long maxTimestamp = RecordBatch.NO_TIMESTAMP;
-        long shallowOffsetOfMaxTimestamp = -1L;
-        long initialOffset = offsetCounter.value;
 
         RecordBatch firstBatch = 
getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE);
 
@@ -269,7 +261,6 @@ public class LogValidator {
 
             if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && 
maxBatchTimestamp > maxTimestamp) {
                 maxTimestamp = maxBatchTimestamp;
-                shallowOffsetOfMaxTimestamp = offsetCounter.value - 1;
             }
 
             batch.setLastOffset(offsetCounter.value - 1);
@@ -286,23 +277,10 @@ public class LogValidator {
         }
 
         if (timestampType == TimestampType.LOG_APPEND_TIME) {
-            maxTimestamp = now;
-            // those checks should be equal to MemoryRecordsBuilder#info
-            switch (toMagic) {
-                case RecordBatch.MAGIC_VALUE_V0:
-                    maxTimestamp = RecordBatch.NO_TIMESTAMP;
-                    // value will be the default value: -1
-                    shallowOffsetOfMaxTimestamp = -1;
-                    break;
-                case RecordBatch.MAGIC_VALUE_V1:
-                    // Those single-record batches have same max timestamp, so 
the initial offset is equal with
-                    // the last offset of earliest batch
-                    shallowOffsetOfMaxTimestamp = initialOffset;
-                    break;
-                default:
-                    // there is only one batch so use the last offset
-                    shallowOffsetOfMaxTimestamp = offsetCounter.value - 1;
-                    break;
+            if (toMagic == RecordBatch.MAGIC_VALUE_V0) {
+                maxTimestamp = RecordBatch.NO_TIMESTAMP;
+            } else {
+                maxTimestamp = now;
             }
         }
 
@@ -310,7 +288,6 @@ public class LogValidator {
             now,
             records,
             maxTimestamp,
-            shallowOffsetOfMaxTimestamp,
             false,
             RecordValidationStats.EMPTY);
     }
@@ -434,7 +411,6 @@ public class LogValidator {
                 now,
                 records,
                 maxTimestamp,
-                lastOffset,
                 false,
                 recordValidationStats);
         }
@@ -476,7 +452,6 @@ public class LogValidator {
             logAppendTime,
             records,
             info.maxTimestamp,
-            info.shallowOffsetOfMaxTimestamp,
             true,
             recordValidationStats);
     }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
index 00b53de34ef..a638f03abde 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
@@ -114,8 +114,6 @@ class LocalLogTest {
 
     private void appendRecords(List<SimpleRecord> records, long initialOffset) 
throws IOException {
         log.append(initialOffset + records.size() - 1,
-                records.get(0).timestamp(),
-                initialOffset,
                 MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, 
records.toArray(new SimpleRecord[0])));
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index 56bdcf7240f..b798378f1af 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.record.EndTransactionMarker;
 import org.apache.kafka.common.record.FileLogInputStream;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
@@ -48,6 +49,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -145,7 +147,7 @@ public class LogSegmentTest {
         try (LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM)) {
             long currentTime = Time.SYSTEM.milliseconds();
             MemoryRecords memoryRecords = v1Records(0, "hello");
-            assertThrows(LogSegmentOffsetOverflowException.class, () -> 
seg.append(largestOffset, currentTime, largestOffset, memoryRecords));
+            assertThrows(LogSegmentOffsetOverflowException.class, () -> 
seg.append(largestOffset, memoryRecords));
         }
     }
 
@@ -168,7 +170,7 @@ public class LogSegmentTest {
     public void testReadBeforeFirstOffset() throws IOException {
         try (LogSegment seg = createSegment(40)) {
             MemoryRecords ms = v1Records(50, "hello", "there", "little", 
"bee");
-            seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            seg.append(53, ms);
             Records read = seg.read(41, 300).records;
             checkEquals(ms.records().iterator(), read.records().iterator());
         }
@@ -183,7 +185,7 @@ public class LogSegmentTest {
         long batchBaseOffset = 50;
         try (LogSegment seg = createSegment(40)) {
             MemoryRecords ms = v2Records(batchBaseOffset, "hello", "there", 
"little", "bee");
-            seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            seg.append(53, ms);
             FetchDataInfo readInfo = seg.read(52, 300);
             assertEquals(batchBaseOffset, 
readInfo.fetchOffsetMetadata.messageOffset);
         }
@@ -196,7 +198,7 @@ public class LogSegmentTest {
     public void testReadAfterLast() throws IOException {
         try (LogSegment seg = createSegment(40)) {
             MemoryRecords ms = v1Records(50, "hello", "there");
-            seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            seg.append(51, ms);
             FetchDataInfo read = seg.read(52, 200);
             assertNull(read, "Read beyond the last offset in the segment 
should give null");
         }
@@ -210,9 +212,9 @@ public class LogSegmentTest {
     public void testReadFromGap() throws IOException {
         try (LogSegment seg = createSegment(40)) {
             MemoryRecords ms = v1Records(50, "hello", "there");
-            seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            seg.append(51, ms);
             MemoryRecords ms2 = v1Records(60, "alpha", "beta");
-            seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+            seg.append(61, ms2);
             FetchDataInfo read = seg.read(55, 200);
             checkEquals(ms2.records().iterator(), 
read.records.records().iterator());
         }
@@ -225,7 +227,7 @@ public class LogSegmentTest {
         int maxSize = 1;
         try (LogSegment seg = createSegment(40)) {
             MemoryRecords ms = v1Records(50, "hello", "there");
-            seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            seg.append(51, ms);
 
             // read at first offset
             FetchDataInfo read = seg.read(50, maxSize, maxPosition, 
minOneMessage);
@@ -257,9 +259,9 @@ public class LogSegmentTest {
             long offset = 40;
             for (int i = 0; i < 30; i++) {
                 MemoryRecords ms1 = v1Records(offset, "hello");
-                seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1);
+                seg.append(offset, ms1);
                 MemoryRecords ms2 = v1Records(offset + 1, "hello");
-                seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+                seg.append(offset + 1, ms2);
 
                 // check that we can read back both messages
                 FetchDataInfo read = seg.read(offset, 10000);
@@ -320,7 +322,7 @@ public class LogSegmentTest {
         try (LogSegment seg = createSegment(40, 2 * v1Records(0, 
"hello").sizeInBytes() - 1)) {
             int offset = 40;
             for (int i = 0; i < numMessages; i++) {
-                seg.append(offset, offset, offset, v1Records(offset, "hello"));
+                seg.append(offset, v1Records(offset, "hello"));
                 offset++;
             }
             assertEquals(offset, seg.readNextOffset());
@@ -343,7 +345,12 @@ public class LogSegmentTest {
         MockTime time = new MockTime();
         try (LogSegment seg = createSegment(40, time)) {
 
-            seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40, 
"hello", "there"));
+            seg.append(41,
+                MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 40, 
Compression.NONE, TimestampType.CREATE_TIME,
+                    List.of(
+                        new SimpleRecord("hello".getBytes()),
+                        new SimpleRecord("there".getBytes())
+                    ).toArray(new SimpleRecord[0])));
 
             // If the segment is empty after truncation, the create time 
should be reset
             time.sleep(500);
@@ -355,7 +362,7 @@ public class LogSegmentTest {
             assertFalse(seg.offsetIndex().isFull());
             assertNull(seg.read(0, 1024), "Segment should be empty.");
 
-            seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40, 
"hello", "there"));
+            seg.append(41, v1Records(40, "hello", "there"));
         }
     }
 
@@ -368,7 +375,7 @@ public class LogSegmentTest {
         try (LogSegment seg = createSegment(40, messageSize * 2 - 1)) {
             // Produce some messages
             for (int i = 40; i < 50; i++) {
-                seg.append(i, i * 10, i, v1Records(i, "msg" + i));
+                seg.append(i, v1Records(i, "msg" + i));
             }
 
             assertEquals(490, seg.largestTimestamp());
@@ -394,7 +401,7 @@ public class LogSegmentTest {
     public void testNextOffsetCalculation() throws IOException {
         try (LogSegment seg = createSegment(40)) {
             assertEquals(40, seg.readNextOffset());
-            seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, v1Records(50, 
"hello", "there", "you"));
+            seg.append(52, v1Records(50, "hello", "there", "you"));
             assertEquals(53, seg.readNextOffset());
         }
     }
@@ -437,7 +444,7 @@ public class LogSegmentTest {
     public void testRecoveryFixesCorruptIndex() throws Exception {
         try (LogSegment seg = createSegment(0)) {
             for (int i = 0; i < 100; i++) {
-                seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, v1Records(i, 
Integer.toString(i)));
+                seg.append(i, v1Records(i, Integer.toString(i)));
             }
             File indexFile = seg.offsetIndexFile();
             writeNonsenseToFile(indexFile, 5, (int) indexFile.length());
@@ -460,27 +467,27 @@ public class LogSegmentTest {
             long pid2 = 10L;
 
             // append transactional records from pid1
-            segment.append(101L, RecordBatch.NO_TIMESTAMP,
-                100L, MemoryRecords.withTransactionalRecords(100L, 
Compression.NONE,
+            segment.append(101L,
+                MemoryRecords.withTransactionalRecords(100L, Compression.NONE,
                     pid1, producerEpoch, sequence, partitionLeaderEpoch, new 
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())));
 
             // append transactional records from pid2
-            segment.append(103L, RecordBatch.NO_TIMESTAMP,
-                102L, MemoryRecords.withTransactionalRecords(102L, 
Compression.NONE,
+            segment.append(103L,
+                MemoryRecords.withTransactionalRecords(102L, Compression.NONE,
                     pid2, producerEpoch, sequence, partitionLeaderEpoch, new 
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())));
 
             // append non-transactional records
-            segment.append(105L, RecordBatch.NO_TIMESTAMP,
-                104L, MemoryRecords.withRecords(104L, Compression.NONE,
+            segment.append(105L,
+                MemoryRecords.withRecords(104L, Compression.NONE,
                     partitionLeaderEpoch, new SimpleRecord("a".getBytes()), 
new SimpleRecord("b".getBytes())));
 
             // abort the transaction from pid2
-            segment.append(106L, RecordBatch.NO_TIMESTAMP,
-                106L, endTxnRecords(ControlRecordType.ABORT, pid2, 
producerEpoch, 106L));
+            segment.append(106L,
+                endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, 
106L));
 
             // commit the transaction from pid1
-            segment.append(107L, RecordBatch.NO_TIMESTAMP,
-                107L, endTxnRecords(ControlRecordType.COMMIT, pid1, 
producerEpoch, 107L));
+            segment.append(107L,
+                endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, 
107L));
 
             ProducerStateManager stateManager = newProducerStateManager();
             segment.recover(stateManager, mock(LeaderEpochFileCache.class));
@@ -522,16 +529,16 @@ public class LogSegmentTest {
             LeaderEpochCheckpointFile checkpoint = new 
LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1));
 
             LeaderEpochFileCache cache = new 
LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new 
MockTime()));
-            seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, 
MemoryRecords.withRecords(104L, Compression.NONE, 0,
+            seg.append(105L, MemoryRecords.withRecords(104L, Compression.NONE, 
0,
                 new SimpleRecord("a".getBytes()), new 
SimpleRecord("b".getBytes())));
 
-            seg.append(107L, RecordBatch.NO_TIMESTAMP, 106L, 
MemoryRecords.withRecords(106L, Compression.NONE, 1,
+            seg.append(107L, MemoryRecords.withRecords(106L, Compression.NONE, 
1,
                 new SimpleRecord("a".getBytes()), new 
SimpleRecord("b".getBytes())));
 
-            seg.append(109L, RecordBatch.NO_TIMESTAMP, 108L, 
MemoryRecords.withRecords(108L, Compression.NONE, 1,
+            seg.append(109L, MemoryRecords.withRecords(108L, Compression.NONE, 
1,
                 new SimpleRecord("a".getBytes()), new 
SimpleRecord("b".getBytes())));
 
-            seg.append(111L, RecordBatch.NO_TIMESTAMP, 110L, 
MemoryRecords.withRecords(110L, Compression.NONE, 2,
+            seg.append(111L, MemoryRecords.withRecords(110L, Compression.NONE, 
2,
                 new SimpleRecord("a".getBytes()), new 
SimpleRecord("b".getBytes())));
 
             seg.recover(newProducerStateManager(), cache);
@@ -567,7 +574,7 @@ public class LogSegmentTest {
     public void testRecoveryFixesCorruptTimeIndex() throws IOException {
         try (LogSegment seg = createSegment(0)) {
             for (int i = 0; i < 100; i++) {
-                seg.append(i, i * 10, i, v1Records(i, String.valueOf(i)));
+                seg.append(i, v1Records(i, String.valueOf(i)));
             }
             File timeIndexFile = seg.timeIndexFile();
             writeNonsenseToFile(timeIndexFile, 5, (int) 
timeIndexFile.length());
@@ -590,7 +597,7 @@ public class LogSegmentTest {
         for (int ignore = 0; ignore < 10; ignore++) {
             try (LogSegment seg = createSegment(0)) {
                 for (int i = 0; i < messagesAppended; i++) {
-                    seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, v1Records(i, 
String.valueOf(i)));
+                    seg.append(i, v1Records(i, String.valueOf(i)));
                 }
                 int offsetToBeginCorruption = 
TestUtils.RANDOM.nextInt(messagesAppended);
                 // start corrupting somewhere in the middle of the chosen 
record all the way to the end
@@ -627,9 +634,9 @@ public class LogSegmentTest {
             512 * 1024 * 1024, true, "")) {
             segments.add(seg);
             MemoryRecords ms = v1Records(50, "hello", "there");
-            seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            seg.append(51, ms);
             MemoryRecords ms2 = v1Records(60, "alpha", "beta");
-            seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+            seg.append(61, ms2);
             FetchDataInfo read = seg.read(55, 200);
             checkEquals(ms2.records().iterator(), 
read.records.records().iterator());
         }
@@ -650,9 +657,9 @@ public class LogSegmentTest {
 
         try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig, 
Time.SYSTEM, 512 * 1024 * 1024, true)) {
             MemoryRecords ms = v1Records(50, "hello", "there");
-            seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            seg.append(51, ms);
             MemoryRecords ms2 = v1Records(60, "alpha", "beta");
-            seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+            seg.append(61, ms2);
             FetchDataInfo read = seg.read(55, 200);
             checkEquals(ms2.records().iterator(), 
read.records.records().iterator());
             long oldSize = seg.log().sizeInBytes();
@@ -690,9 +697,9 @@ public class LogSegmentTest {
 
             // Given two messages with a gap between them (e.g. mid offset 
compacted away)
             MemoryRecords ms1 = recordsForTruncateEven(offset, "first 
message");
-            seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1);
+            seg.append(offset, ms1);
             MemoryRecords ms2 = recordsForTruncateEven(offset + 3, "message 
after gap");
-            seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+            seg.append(offset + 3, ms2);
 
             // When we truncate to an offset without a corresponding log entry
             seg.truncateTo(offset + 1);
@@ -743,7 +750,8 @@ public class LogSegmentTest {
         try (LogSegment segment = createSegment(1)) {
             assertEquals(Long.MAX_VALUE, segment.getFirstBatchTimestamp());
 
-            segment.append(1, 1000L, 1, MemoryRecords.withRecords(1, 
Compression.NONE, new SimpleRecord("one".getBytes())));
+            segment.append(1,
+                MemoryRecords.withRecords(1, Compression.NONE, new 
SimpleRecord(1000L, "one".getBytes())));
             assertEquals(1000L, segment.getFirstBatchTimestamp());
         }
     }
@@ -780,6 +788,77 @@ public class LogSegmentTest {
         }
     }
 
+    @Test
+    public void testIndexForMultipleBatchesInMemoryRecords() throws 
IOException {
+        LogSegment segment = createSegment(0, 1, Time.SYSTEM);
+
+        ByteBuffer buffer1 = ByteBuffer.allocate(1024);
+        // append first batch to buffer1
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer1, 
Compression.NONE, TimestampType.CREATE_TIME, 0);
+        builder.append(0L, "key1".getBytes(), "value1".getBytes());
+        builder.close();
+
+        // append second batch to buffer1
+        builder = MemoryRecords.builder(buffer1, Compression.NONE, 
TimestampType.CREATE_TIME, 1);
+        builder.append(1L, "key1".getBytes(), "value1".getBytes());
+        builder.close();
+
+        buffer1.flip();
+        MemoryRecords record = MemoryRecords.readableRecords(buffer1);
+        segment.append(1L, record);
+
+        ByteBuffer buffer2 = ByteBuffer.allocate(1024);
+        // append first batch to buffer2
+        builder = MemoryRecords.builder(buffer2, Compression.NONE, 
TimestampType.CREATE_TIME, 2);
+        builder.append(2L, "key1".getBytes(), "value1".getBytes());
+        builder.close();
+
+        buffer2.flip();
+        record = MemoryRecords.readableRecords(buffer2);
+        segment.append(2L, record);
+
+        assertEquals(2, segment.offsetIndex().entries());
+        assertEquals(1, segment.offsetIndex().entry(0).offset);
+        assertEquals(2, segment.offsetIndex().entry(1).offset);
+
+        assertEquals(2, segment.timeIndex().entries());
+        assertEquals(new TimestampOffset(1, 1), segment.timeIndex().entry(0));
+        assertEquals(new TimestampOffset(2, 2), segment.timeIndex().entry(1));
+    }
+
+    @Test
+    public void testNonMonotonicTimestampForMultipleBatchesInMemoryRecords() 
throws IOException {
+        LogSegment segment = createSegment(0, 1, Time.SYSTEM);
+
+        ByteBuffer buffer1 = ByteBuffer.allocate(1024);
+        // append first batch to buffer1
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer1, 
Compression.NONE, TimestampType.CREATE_TIME, 0);
+        builder.append(1L, "key1".getBytes(), "value1".getBytes());
+        builder.close();
+
+        // append second batch to buffer1
+        builder = MemoryRecords.builder(buffer1, Compression.NONE, 
TimestampType.CREATE_TIME, 1);
+        builder.append(0L, "key1".getBytes(), "value1".getBytes());
+        builder.close();
+
+        // append third batch to buffer1
+        builder = MemoryRecords.builder(buffer1, Compression.NONE, 
TimestampType.CREATE_TIME, 2);
+        builder.append(2L, "key1".getBytes(), "value1".getBytes());
+        builder.close();
+
+        buffer1.flip();
+        MemoryRecords record = MemoryRecords.readableRecords(buffer1);
+        segment.append(2L, record);
+
+        assertEquals(2, segment.offsetIndex().entries());
+        assertEquals(1, segment.offsetIndex().entry(0).offset);
+        assertEquals(2, segment.offsetIndex().entry(1).offset);
+
+        assertEquals(2, segment.timeIndex().entries());
+        assertEquals(new TimestampOffset(1, 0), segment.timeIndex().entry(0));
+        assertEquals(new TimestampOffset(2, 2), segment.timeIndex().entry(1));
+    }
+
     private ProducerStateManager newProducerStateManager() throws IOException {
         return new ProducerStateManager(
             topicPartition,
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
index c58b15257ec..3bef886cee6 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java
@@ -240,7 +240,6 @@ public class LogValidatorTest {
         }
 
         assertEquals(timestamp, validatedResults.maxTimestampMs);
-        assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, "Offset 
of max timestamp should be the last offset 2.");
         assertTrue(validatedResults.messageSizeMaybeChanged, "Message size 
should have been changed");
 
         verifyRecordValidationStats(
@@ -287,7 +286,6 @@ public class LogValidatorTest {
         }
         assertEquals(RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestampMs,
                 "Max timestamp should be " + RecordBatch.NO_TIMESTAMP);
-        assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp);
         assertTrue(validatedResults.messageSizeMaybeChanged, "Message size 
should have been changed");
 
         verifyRecordValidationStats(validatedResults.recordValidationStats, 3, 
records, true);
@@ -383,7 +381,6 @@ public class LogValidatorTest {
         // Both V2 and V1 have single batch in the validated records when 
compression is enabled, and hence their shallow
         // OffsetOfMaxTimestamp is the last offset of the single batch
         assertEquals(1, iteratorSize(validatedRecords.batches().iterator()));
-        assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp);
         assertTrue(validatingResults.messageSizeMaybeChanged,
                 "Message size should have been changed");
 
@@ -571,8 +568,6 @@ public class LogValidatorTest {
 
         assertEquals(now + 1, validatedResults.maxTimestampMs, "Max timestamp 
should be " + (now + 1));
 
-        int expectedShallowOffsetOfMaxTimestamp = 2;
-        assertEquals(expectedShallowOffsetOfMaxTimestamp, 
validatedResults.shallowOffsetOfMaxTimestamp, "Shallow offset of max timestamp 
should be 2");
         assertFalse(validatedResults.messageSizeMaybeChanged, "Message size 
should not have been changed");
 
         verifyRecordValidationStats(validatedResults.recordValidationStats, 0, 
records, true);
@@ -1831,10 +1826,8 @@ public class LogValidatorTest {
 
         if (magic >= RecordBatch.MAGIC_VALUE_V2) {
             assertEquals(1, iteratorSize(records.batches().iterator()));
-            assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp);
         } else {
             assertEquals(3, iteratorSize(records.batches().iterator()));
-            assertEquals(1, validatingResults.shallowOffsetOfMaxTimestamp);
         }
 
         assertFalse(validatingResults.messageSizeMaybeChanged,
@@ -1908,8 +1901,6 @@ public class LogValidatorTest {
                 "MessageSet should still valid");
         assertEquals(now, validatedResults.maxTimestampMs,
                 "Max timestamp should be " + now);
-        assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp,
-                "The shallow offset of max timestamp should be the last offset 
2 if logAppendTime is used");
         assertFalse(validatedResults.messageSizeMaybeChanged,
                 "Message size should not have been changed");
 
@@ -1950,8 +1941,6 @@ public class LogValidatorTest {
         assertTrue(validatedRecords.batches().iterator().next().isValid(),
                 "MessageSet should still valid");
         assertEquals(now, validatedResults.maxTimestampMs, String.format("Max 
timestamp should be %d", now));
-        assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp,
-                "The shallow offset of max timestamp should be 2 if 
logAppendTime is used");
         assertTrue(validatedResults.messageSizeMaybeChanged,
                 "Message size may have been changed");
 
@@ -2002,19 +1991,6 @@ public class LogValidatorTest {
 
         assertFalse(validatedResults.messageSizeMaybeChanged, "Message size 
should not have been changed");
 
-        int expectedMaxTimestampOffset;
-        switch (magic) {
-            case RecordBatch.MAGIC_VALUE_V0:
-                expectedMaxTimestampOffset = -1;
-                break;
-            case RecordBatch.MAGIC_VALUE_V1:
-                expectedMaxTimestampOffset = 0;
-                break;
-            default:
-                expectedMaxTimestampOffset = 2;
-                break;
-        }
-        assertEquals(expectedMaxTimestampOffset, 
validatedResults.shallowOffsetOfMaxTimestamp);
         verifyRecordValidationStats(validatedResults.recordValidationStats, 0, 
records, false);
     }
 

Reply via email to