This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 568b9e8a6ce KAFKA-17803: LogSegment#read should return the base offset 
of the batch that contains startOffset rather than startOffset (#17528)
568b9e8a6ce is described below

commit 568b9e8a6ce9d7982ed4d3da35eaf26dd590f006
Author: kevin-wu24 <[email protected]>
AuthorDate: Fri Nov 1 09:32:00 2024 -0700

    KAFKA-17803: LogSegment#read should return the base offset of the batch 
that contains startOffset rather than startOffset (#17528)
    
    Reviewers: Jose Sancio <[email protected]>, Jun Rao <[email protected]>
---
 .../apache/kafka/common/record/FileRecords.java    |  8 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  4 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 34 ++++++---
 .../kafka/storage/internals/log/LogSegment.java    | 14 ++--
 .../storage/internals/log/LogSegmentTest.java      | 86 +++++++++++++---------
 5 files changed, 88 insertions(+), 58 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 84eb6f20ac5..64dd73de412 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -306,18 +306,18 @@ public class FileRecords extends AbstractRecords 
implements Closeable {
     }
 
     /**
-     * Search forward for the file position of the last offset that is greater 
than or equal to the target offset
-     * and return its physical position and the size of the message (including 
log overhead) at the returned offset. If
-     * no such offsets are found, return null.
+     * Search forward for the file position of the message batch whose last 
offset that is greater
+     * than or equal to the target offset. If no such batch is found, return 
null.
      *
      * @param targetOffset The offset to search for.
      * @param startingPosition The starting position in the file to begin 
searching from.
+     * @return the batch's base offset, its physical position, and its size 
(including log overhead)
      */
     public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int 
startingPosition) {
         for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
             long offset = batch.lastOffset();
             if (offset >= targetOffset)
-                return new LogOffsetPosition(offset, batch.position(), 
batch.sizeInBytes());
+                return new LogOffsetPosition(batch.baseOffset(), 
batch.position(), batch.sizeInBytes());
         }
         return null;
     }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 7023fc270a5..fbaf556df7d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -1149,8 +1149,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     // let the follower in ISR move leader's HW to move further but below LEO
     fetchFollower(partition, replicaId = follower2, fetchOffset = 0)
-    fetchFollower(partition, replicaId = follower2, fetchOffset = 
lastOffsetOfFirstBatch)
-    assertEquals(lastOffsetOfFirstBatch, partition.log.get.highWatermark, 
"Expected leader's HW")
+    fetchFollower(partition, replicaId = follower2, fetchOffset = 
lastOffsetOfFirstBatch + 1)
+    assertEquals(lastOffsetOfFirstBatch + 1, partition.log.get.highWatermark, 
"Expected leader's HW")
 
     // current leader becomes follower and then leader again (without any new 
records appended)
     val followerState = new LeaderAndIsrPartitionState()
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index b3553c5b8ed..61a93102a02 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -58,6 +58,7 @@ import java.nio.file.Files
 import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
 import java.util.{Optional, OptionalLong, Properties}
 import scala.annotation.nowarn
+import scala.collection.immutable.SortedSet
 import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.{RichOptional, RichOptionalInt}
@@ -324,7 +325,7 @@ class UnifiedLogTest {
     assertHighWatermark(4L)
   }
 
-  private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation: 
FetchIsolation): Unit = {
+  private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation: 
FetchIsolation, batchBaseOffset: Long): Unit = {
     val readInfo = log.read(startOffset = offset,
       maxLength = Int.MaxValue,
       isolation = isolation,
@@ -342,18 +343,18 @@ class UnifiedLogTest {
     for (record <- readInfo.records.records.asScala)
       assertTrue(record.offset < upperBoundOffset)
 
-    assertEquals(offset, readInfo.fetchOffsetMetadata.messageOffset)
+    assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset)
     assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
   }
 
-  private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation: 
FetchIsolation): Unit = {
+  private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation: 
FetchIsolation, batchBaseOffset: Long): Unit = {
     val readInfo = log.read(startOffset = offset,
       maxLength = Int.MaxValue,
       isolation = isolation,
       minOneMessage = true)
     assertFalse(readInfo.firstEntryIncomplete)
     assertEquals(0, readInfo.records.sizeInBytes)
-    assertEquals(offset, readInfo.fetchOffsetMetadata.messageOffset)
+    assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset)
     assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
   }
 
@@ -371,9 +372,11 @@ class UnifiedLogTest {
       new SimpleRecord("3".getBytes),
       new SimpleRecord("4".getBytes)
     )), leaderEpoch = 0)
+    val batchBaseOffsets = SortedSet[Long](0, 3, 5)
 
     (log.logStartOffset until log.logEndOffset).foreach { offset =>
-      assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END)
+      val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
+      assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END, batchBaseOffset)
     }
   }
 
@@ -391,14 +394,17 @@ class UnifiedLogTest {
       new SimpleRecord("3".getBytes),
       new SimpleRecord("4".getBytes)
     )), leaderEpoch = 0)
+    val batchBaseOffsets = SortedSet[Long](0, 3, 5)
 
     def assertHighWatermarkBoundedFetches(): Unit = {
       (log.logStartOffset until log.highWatermark).foreach { offset =>
-        assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
+        val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
+        assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, 
batchBaseOffset)
       }
 
       (log.highWatermark to log.logEndOffset).foreach { offset =>
-        assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
+        val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
+        assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, 
batchBaseOffset)
       }
     }
 
@@ -488,13 +494,17 @@ class UnifiedLogTest {
     LogTestUtils.appendNonTransactionalAsLeader(log, 2)
     appendProducer1(10)
 
+    val batchBaseOffsets = SortedSet[Long](0, 5, 8, 10, 14, 16, 26, 27, 28)
+
     def assertLsoBoundedFetches(): Unit = {
       (log.logStartOffset until log.lastStableOffset).foreach { offset =>
-        assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED)
+        val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
+        assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED, 
batchBaseOffset)
       }
 
       (log.lastStableOffset to log.logEndOffset).foreach { offset =>
-        assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED)
+        val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
+        assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED, 
batchBaseOffset)
       }
     }
 
@@ -3464,13 +3474,13 @@ class UnifiedLogTest {
       new SimpleRecord("c".getBytes)), 5)
 
 
-    log.updateHighWatermark(2L)
+    log.updateHighWatermark(3L)
     var offsets: LogOffsetSnapshot = log.fetchOffsetSnapshot
-    assertEquals(offsets.highWatermark.messageOffset, 2L)
+    assertEquals(offsets.highWatermark.messageOffset, 3L)
     assertFalse(offsets.highWatermark.messageOffsetOnly)
 
     offsets = log.fetchOffsetSnapshot
-    assertEquals(offsets.highWatermark.messageOffset, 2L)
+    assertEquals(offsets.highWatermark.messageOffset, 3L)
     assertFalse(offsets.highWatermark.messageOffsetOnly)
   }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 2b8e6b3aa65..faf0ece6067 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -372,7 +372,7 @@ public class LogSegment implements Closeable {
     }
 
     /**
-     * Find the physical file position for the first message with offset >= 
the requested offset.
+     * Find the physical file position for the message batch that contains the 
requested offset.
      *
      * The startingFilePosition argument is an optimization that can be used 
if we already know a valid starting position
      * in the file higher than the greatest-lower-bound from the index.
@@ -382,8 +382,8 @@ public class LogSegment implements Closeable {
      * @param offset The offset we want to translate
      * @param startingFilePosition A lower bound on the file position from 
which to begin the search. This is purely an optimization and
      * when omitted, the search will begin at the position in the offset index.
-     * @return The position in the log storing the message with the least 
offset >= the requested offset and the size of the
-     *        message or null if no message meets this criteria.
+     * @return The base offset, position in the log, and size of the message 
batch that contains the requested offset,
+     * or null if no such batch is found.
      */
     LogOffsetPosition translateOffset(long offset, int startingFilePosition) 
throws IOException {
         OffsetPosition mapping = offsetIndex().lookup(offset);
@@ -409,17 +409,17 @@ public class LogSegment implements Closeable {
     }
 
     /**
-     * Read a message set from this segment beginning with the first offset >= 
startOffset. The message set will include
+     * Read a message set from this segment that contains startOffset. The 
message set will include
      * no more than maxSize bytes and will end before maxOffset if a maxOffset 
is specified.
      *
      * This method is thread-safe.
      *
-     * @param startOffset A lower bound on the first offset to include in the 
message set we read
+     * @param startOffset The logical log offset we are trying to read
      * @param maxSize The maximum number of bytes to include in the message 
set we read
      * @param maxPositionOpt The maximum position in the log segment that 
should be exposed for read
      * @param minOneMessage If this is true, the first message will be 
returned even if it exceeds `maxSize` (if one exists)
      *
-     * @return The fetched data and the offset metadata of the first message 
whose offset is >= startOffset,
+     * @return The fetched data and the base offset metadata of the message 
batch that contains startOffset,
      *         or null if the startOffset is larger than the largest offset in 
this log
      */
     public FetchDataInfo read(long startOffset, int maxSize, Optional<Long> 
maxPositionOpt, boolean minOneMessage) throws IOException {
@@ -433,7 +433,7 @@ public class LogSegment implements Closeable {
             return null;
 
         int startPosition = startOffsetAndSize.position;
-        LogOffsetMetadata offsetMetadata = new LogOffsetMetadata(startOffset, 
this.baseOffset, startPosition);
+        LogOffsetMetadata offsetMetadata = new 
LogOffsetMetadata(startOffsetAndSize.offset, this.baseOffset, startPosition);
 
         int adjustedMaxSize = maxSize;
         if (minOneMessage)
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index 695c19d4208..616671a6549 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -93,7 +93,7 @@ public class LogSegmentTest {
     }
 
     /* Create a ByteBufferMessageSet for the given messages starting from the 
given offset */
-    private MemoryRecords records(long offset, String... records) {
+    private MemoryRecords v1Records(long offset, String... records) {
         List<SimpleRecord> simpleRecords = new ArrayList<>();
         for (String s : records) {
             simpleRecords.add(new SimpleRecord(offset * 10, s.getBytes()));
@@ -103,6 +103,16 @@ public class LogSegmentTest {
             Compression.NONE, TimestampType.CREATE_TIME, 
simpleRecords.toArray(new SimpleRecord[0]));
     }
 
+    private MemoryRecords v2Records(long offset, String... records) {
+        List<SimpleRecord> simpleRecords = new ArrayList<>();
+        for (String s : records) {
+            simpleRecords.add(new SimpleRecord(offset * 10, s.getBytes()));
+        }
+        return MemoryRecords.withRecords(
+                RecordBatch.MAGIC_VALUE_V2, offset,
+                Compression.NONE, TimestampType.CREATE_TIME, 
simpleRecords.toArray(new SimpleRecord[0]));
+    }
+
     @BeforeEach
     public void setup() {
         logDir = TestUtils.tempDirectory();
@@ -134,7 +144,7 @@ public class LogSegmentTest {
     public void testAppendForLogSegmentOffsetOverflowException(long 
baseOffset, long largestOffset) throws IOException {
         try (LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM)) {
             long currentTime = Time.SYSTEM.milliseconds();
-            MemoryRecords memoryRecords = records(0, "hello");
+            MemoryRecords memoryRecords = v1Records(0, "hello");
             assertThrows(LogSegmentOffsetOverflowException.class, () -> 
seg.append(largestOffset, currentTime, largestOffset, memoryRecords));
         }
     }
@@ -157,20 +167,35 @@ public class LogSegmentTest {
     @Test
     public void testReadBeforeFirstOffset() throws IOException {
         try (LogSegment seg = createSegment(40)) {
-            MemoryRecords ms = records(50, "hello", "there", "little", "bee");
+            MemoryRecords ms = v1Records(50, "hello", "there", "little", 
"bee");
             seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms);
             Records read = seg.read(41, 300).records;
             checkEquals(ms.records().iterator(), read.records().iterator());
         }
     }
 
+    /**
+     * Reading from an offset in the middle of a batch should return a
+     * LogOffsetMetadata offset that points to the batch's base offset
+     */
+    @Test
+    public void testReadFromMiddleOfBatch() throws IOException {
+        long batchBaseOffset = 50;
+        try (LogSegment seg = createSegment(40)) {
+            MemoryRecords ms = v2Records(batchBaseOffset, "hello", "there", 
"little", "bee");
+            seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            FetchDataInfo readInfo = seg.read(52, 300);
+            assertEquals(batchBaseOffset, 
readInfo.fetchOffsetMetadata.messageOffset);
+        }
+    }
+
     /**
      * If we read from an offset beyond the last offset in the segment we 
should get null
      */
     @Test
     public void testReadAfterLast() throws IOException {
         try (LogSegment seg = createSegment(40)) {
-            MemoryRecords ms = records(50, "hello", "there");
+            MemoryRecords ms = v1Records(50, "hello", "there");
             seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
             FetchDataInfo read = seg.read(52, 200);
             assertNull(read, "Read beyond the last offset in the segment 
should give null");
@@ -184,9 +209,9 @@ public class LogSegmentTest {
     @Test
     public void testReadFromGap() throws IOException {
         try (LogSegment seg = createSegment(40)) {
-            MemoryRecords ms = records(50, "hello", "there");
+            MemoryRecords ms = v1Records(50, "hello", "there");
             seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
-            MemoryRecords ms2 = records(60, "alpha", "beta");
+            MemoryRecords ms2 = v1Records(60, "alpha", "beta");
             seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
             FetchDataInfo read = seg.read(55, 200);
             checkEquals(ms2.records().iterator(), 
read.records.records().iterator());
@@ -199,16 +224,11 @@ public class LogSegmentTest {
         Optional<Long> maxPosition = Optional.empty();
         int maxSize = 1;
         try (LogSegment seg = createSegment(40)) {
-            MemoryRecords ms = records(50, "hello", "there");
+            MemoryRecords ms = v1Records(50, "hello", "there");
             seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
 
-            // read before first offset
-            FetchDataInfo read = seg.read(48, maxSize, maxPosition, 
minOneMessage);
-            assertEquals(new LogOffsetMetadata(48, 40, 0), 
read.fetchOffsetMetadata);
-            assertFalse(read.records.records().iterator().hasNext());
-
             // read at first offset
-            read = seg.read(50, maxSize, maxPosition, minOneMessage);
+            FetchDataInfo read = seg.read(50, maxSize, maxPosition, 
minOneMessage);
             assertEquals(new LogOffsetMetadata(50, 40, 0), 
read.fetchOffsetMetadata);
             assertFalse(read.records.records().iterator().hasNext());
 
@@ -236,9 +256,9 @@ public class LogSegmentTest {
         try (LogSegment seg = createSegment(40)) {
             long offset = 40;
             for (int i = 0; i < 30; i++) {
-                MemoryRecords ms1 = records(offset, "hello");
+                MemoryRecords ms1 = v1Records(offset, "hello");
                 seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1);
-                MemoryRecords ms2 = records(offset + 1, "hello");
+                MemoryRecords ms2 = v1Records(offset + 1, "hello");
                 seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2);
 
                 // check that we can read back both messages
@@ -297,10 +317,10 @@ public class LogSegmentTest {
     @Test
     public void testReloadLargestTimestampAndNextOffsetAfterTruncation() 
throws IOException {
         int numMessages = 30;
-        try (LogSegment seg = createSegment(40, 2 * records(0, 
"hello").sizeInBytes() - 1)) {
+        try (LogSegment seg = createSegment(40, 2 * v1Records(0, 
"hello").sizeInBytes() - 1)) {
             int offset = 40;
             for (int i = 0; i < numMessages; i++) {
-                seg.append(offset, offset, offset, records(offset, "hello"));
+                seg.append(offset, offset, offset, v1Records(offset, "hello"));
                 offset++;
             }
             assertEquals(offset, seg.readNextOffset());
@@ -323,7 +343,7 @@ public class LogSegmentTest {
         MockTime time = new MockTime();
         try (LogSegment seg = createSegment(40, time)) {
 
-            seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"));
+            seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40, 
"hello", "there"));
 
             // If the segment is empty after truncation, the create time 
should be reset
             time.sleep(500);
@@ -335,7 +355,7 @@ public class LogSegmentTest {
             assertFalse(seg.offsetIndex().isFull());
             assertNull(seg.read(0, 1024), "Segment should be empty.");
 
-            seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"));
+            seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40, 
"hello", "there"));
         }
     }
 
@@ -344,11 +364,11 @@ public class LogSegmentTest {
      */
     @Test
     public void testFindOffsetByTimestamp() throws IOException {
-        int messageSize = records(0, "msg00").sizeInBytes();
+        int messageSize = v1Records(0, "msg00").sizeInBytes();
         try (LogSegment seg = createSegment(40, messageSize * 2 - 1)) {
             // Produce some messages
             for (int i = 40; i < 50; i++) {
-                seg.append(i, i * 10, i, records(i, "msg" + i));
+                seg.append(i, i * 10, i, v1Records(i, "msg" + i));
             }
 
             assertEquals(490, seg.largestTimestamp());
@@ -374,7 +394,7 @@ public class LogSegmentTest {
     public void testNextOffsetCalculation() throws IOException {
         try (LogSegment seg = createSegment(40)) {
             assertEquals(40, seg.readNextOffset());
-            seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", 
"there", "you"));
+            seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, v1Records(50, 
"hello", "there", "you"));
             assertEquals(53, seg.readNextOffset());
         }
     }
@@ -417,7 +437,7 @@ public class LogSegmentTest {
     public void testRecoveryFixesCorruptIndex() throws Exception {
         try (LogSegment seg = createSegment(0)) {
             for (int i = 0; i < 100; i++) {
-                seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, 
Integer.toString(i)));
+                seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, v1Records(i, 
Integer.toString(i)));
             }
             File indexFile = seg.offsetIndexFile();
             writeNonsenseToFile(indexFile, 5, (int) indexFile.length());
@@ -547,7 +567,7 @@ public class LogSegmentTest {
     public void testRecoveryFixesCorruptTimeIndex() throws IOException {
         try (LogSegment seg = createSegment(0)) {
             for (int i = 0; i < 100; i++) {
-                seg.append(i, i * 10, i, records(i, String.valueOf(i)));
+                seg.append(i, i * 10, i, v1Records(i, String.valueOf(i)));
             }
             File timeIndexFile = seg.timeIndexFile();
             writeNonsenseToFile(timeIndexFile, 5, (int) 
timeIndexFile.length());
@@ -570,7 +590,7 @@ public class LogSegmentTest {
         for (int ignore = 0; ignore < 10; ignore++) {
             try (LogSegment seg = createSegment(0)) {
                 for (int i = 0; i < messagesAppended; i++) {
-                    seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, 
String.valueOf(i)));
+                    seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, v1Records(i, 
String.valueOf(i)));
                 }
                 int offsetToBeginCorruption = 
TestUtils.RANDOM.nextInt(messagesAppended);
                 // start corrupting somewhere in the middle of the chosen 
record all the way to the end
@@ -606,9 +626,9 @@ public class LogSegmentTest {
         try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig, 
Time.SYSTEM, false,
             512 * 1024 * 1024, true, "")) {
             segments.add(seg);
-            MemoryRecords ms = records(50, "hello", "there");
+            MemoryRecords ms = v1Records(50, "hello", "there");
             seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
-            MemoryRecords ms2 = records(60, "alpha", "beta");
+            MemoryRecords ms2 = v1Records(60, "alpha", "beta");
             seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
             FetchDataInfo read = seg.read(55, 200);
             checkEquals(ms2.records().iterator(), 
read.records.records().iterator());
@@ -629,9 +649,9 @@ public class LogSegmentTest {
         LogConfig logConfig = new LogConfig(configMap);
 
         try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig, 
Time.SYSTEM, 512 * 1024 * 1024, true)) {
-            MemoryRecords ms = records(50, "hello", "there");
+            MemoryRecords ms = v1Records(50, "hello", "there");
             seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
-            MemoryRecords ms2 = records(60, "alpha", "beta");
+            MemoryRecords ms2 = v1Records(60, "alpha", "beta");
             seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
             FetchDataInfo read = seg.read(55, 200);
             checkEquals(ms2.records().iterator(), 
read.records.records().iterator());
@@ -685,7 +705,7 @@ public class LogSegmentTest {
         }
     }
 
-    private MemoryRecords records(long offset, int size) {
+    private MemoryRecords v2RecordWithSize(long offset, int size) {
         return MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, offset, 
Compression.NONE, TimestampType.CREATE_TIME,
             new SimpleRecord(new byte[size]));
     }
@@ -697,10 +717,10 @@ public class LogSegmentTest {
         FileRecords fileRecords = 
FileRecords.open(LogFileUtils.logFile(tempDir, 0));
 
         // Simulate a scenario with log offset range exceeding 
Integer.MAX_VALUE
-        fileRecords.append(records(0, 1024));
-        fileRecords.append(records(500, 1024 * 1024 + 1));
+        fileRecords.append(v2RecordWithSize(0, 1024));
+        fileRecords.append(v2RecordWithSize(500, 1024 * 1024 + 1));
         long sizeBeforeOverflow = fileRecords.sizeInBytes();
-        fileRecords.append(records(Integer.MAX_VALUE + 5L, 1024));
+        fileRecords.append(v2RecordWithSize(Integer.MAX_VALUE + 5L, 1024));
         long sizeAfterOverflow = fileRecords.sizeInBytes();
 
         try (LogSegment segment = createSegment(0)) {

Reply via email to