This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 568b9e8a6ce KAFKA-17803: LogSegment#read should return the base offset
of the batch that contains startOffset rather than startOffset (#17528)
568b9e8a6ce is described below
commit 568b9e8a6ce9d7982ed4d3da35eaf26dd590f006
Author: kevin-wu24 <[email protected]>
AuthorDate: Fri Nov 1 09:32:00 2024 -0700
KAFKA-17803: LogSegment#read should return the base offset of the batch
that contains startOffset rather than startOffset (#17528)
Reviewers: Jose Sancio <[email protected]>, Jun Rao <[email protected]>
---
.../apache/kafka/common/record/FileRecords.java | 8 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 4 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 34 ++++++---
.../kafka/storage/internals/log/LogSegment.java | 14 ++--
.../storage/internals/log/LogSegmentTest.java | 86 +++++++++++++---------
5 files changed, 88 insertions(+), 58 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 84eb6f20ac5..64dd73de412 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -306,18 +306,18 @@ public class FileRecords extends AbstractRecords
implements Closeable {
}
/**
- * Search forward for the file position of the last offset that is greater
than or equal to the target offset
- * and return its physical position and the size of the message (including
log overhead) at the returned offset. If
- * no such offsets are found, return null.
+ * Search forward for the file position of the message batch whose last
offset that is greater
+ * than or equal to the target offset. If no such batch is found, return
null.
*
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin
searching from.
+ * @return the batch's base offset, its physical position, and its size
(including log overhead)
*/
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int
startingPosition) {
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
long offset = batch.lastOffset();
if (offset >= targetOffset)
- return new LogOffsetPosition(offset, batch.position(),
batch.sizeInBytes());
+ return new LogOffsetPosition(batch.baseOffset(),
batch.position(), batch.sizeInBytes());
}
return null;
}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 7023fc270a5..fbaf556df7d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -1149,8 +1149,8 @@ class PartitionTest extends AbstractPartitionTest {
// let the follower in ISR move leader's HW to move further but below LEO
fetchFollower(partition, replicaId = follower2, fetchOffset = 0)
- fetchFollower(partition, replicaId = follower2, fetchOffset =
lastOffsetOfFirstBatch)
- assertEquals(lastOffsetOfFirstBatch, partition.log.get.highWatermark,
"Expected leader's HW")
+ fetchFollower(partition, replicaId = follower2, fetchOffset =
lastOffsetOfFirstBatch + 1)
+ assertEquals(lastOffsetOfFirstBatch + 1, partition.log.get.highWatermark,
"Expected leader's HW")
// current leader becomes follower and then leader again (without any new
records appended)
val followerState = new LeaderAndIsrPartitionState()
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index b3553c5b8ed..61a93102a02 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -58,6 +58,7 @@ import java.nio.file.Files
import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
import java.util.{Optional, OptionalLong, Properties}
import scala.annotation.nowarn
+import scala.collection.immutable.SortedSet
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOptional, RichOptionalInt}
@@ -324,7 +325,7 @@ class UnifiedLogTest {
assertHighWatermark(4L)
}
- private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation:
FetchIsolation): Unit = {
+ private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation:
FetchIsolation, batchBaseOffset: Long): Unit = {
val readInfo = log.read(startOffset = offset,
maxLength = Int.MaxValue,
isolation = isolation,
@@ -342,18 +343,18 @@ class UnifiedLogTest {
for (record <- readInfo.records.records.asScala)
assertTrue(record.offset < upperBoundOffset)
- assertEquals(offset, readInfo.fetchOffsetMetadata.messageOffset)
+ assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset)
assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
}
- private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation:
FetchIsolation): Unit = {
+ private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation:
FetchIsolation, batchBaseOffset: Long): Unit = {
val readInfo = log.read(startOffset = offset,
maxLength = Int.MaxValue,
isolation = isolation,
minOneMessage = true)
assertFalse(readInfo.firstEntryIncomplete)
assertEquals(0, readInfo.records.sizeInBytes)
- assertEquals(offset, readInfo.fetchOffsetMetadata.messageOffset)
+ assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset)
assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
}
@@ -371,9 +372,11 @@ class UnifiedLogTest {
new SimpleRecord("3".getBytes),
new SimpleRecord("4".getBytes)
)), leaderEpoch = 0)
+ val batchBaseOffsets = SortedSet[Long](0, 3, 5)
(log.logStartOffset until log.logEndOffset).foreach { offset =>
- assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END)
+ val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
+ assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END, batchBaseOffset)
}
}
@@ -391,14 +394,17 @@ class UnifiedLogTest {
new SimpleRecord("3".getBytes),
new SimpleRecord("4".getBytes)
)), leaderEpoch = 0)
+ val batchBaseOffsets = SortedSet[Long](0, 3, 5)
def assertHighWatermarkBoundedFetches(): Unit = {
(log.logStartOffset until log.highWatermark).foreach { offset =>
- assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
+ val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
+ assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset)
}
(log.highWatermark to log.logEndOffset).foreach { offset =>
- assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
+ val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
+ assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset)
}
}
@@ -488,13 +494,17 @@ class UnifiedLogTest {
LogTestUtils.appendNonTransactionalAsLeader(log, 2)
appendProducer1(10)
+ val batchBaseOffsets = SortedSet[Long](0, 5, 8, 10, 14, 16, 26, 27, 28)
+
def assertLsoBoundedFetches(): Unit = {
(log.logStartOffset until log.lastStableOffset).foreach { offset =>
- assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED)
+ val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
+ assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset)
}
(log.lastStableOffset to log.logEndOffset).foreach { offset =>
- assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED)
+ val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
+ assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset)
}
}
@@ -3464,13 +3474,13 @@ class UnifiedLogTest {
new SimpleRecord("c".getBytes)), 5)
- log.updateHighWatermark(2L)
+ log.updateHighWatermark(3L)
var offsets: LogOffsetSnapshot = log.fetchOffsetSnapshot
- assertEquals(offsets.highWatermark.messageOffset, 2L)
+ assertEquals(offsets.highWatermark.messageOffset, 3L)
assertFalse(offsets.highWatermark.messageOffsetOnly)
offsets = log.fetchOffsetSnapshot
- assertEquals(offsets.highWatermark.messageOffset, 2L)
+ assertEquals(offsets.highWatermark.messageOffset, 3L)
assertFalse(offsets.highWatermark.messageOffsetOnly)
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 2b8e6b3aa65..faf0ece6067 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -372,7 +372,7 @@ public class LogSegment implements Closeable {
}
/**
- * Find the physical file position for the first message with offset >=
the requested offset.
+ * Find the physical file position for the message batch that contains the
requested offset.
*
* The startingFilePosition argument is an optimization that can be used
if we already know a valid starting position
* in the file higher than the greatest-lower-bound from the index.
@@ -382,8 +382,8 @@ public class LogSegment implements Closeable {
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from
which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
- * @return The position in the log storing the message with the least
offset >= the requested offset and the size of the
- * message or null if no message meets this criteria.
+ * @return The base offset, position in the log, and size of the message
batch that contains the requested offset,
+ * or null if no such batch is found.
*/
LogOffsetPosition translateOffset(long offset, int startingFilePosition)
throws IOException {
OffsetPosition mapping = offsetIndex().lookup(offset);
@@ -409,17 +409,17 @@ public class LogSegment implements Closeable {
}
/**
- * Read a message set from this segment beginning with the first offset >=
startOffset. The message set will include
+ * Read a message set from this segment that contains startOffset. The
message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset
is specified.
*
* This method is thread-safe.
*
- * @param startOffset A lower bound on the first offset to include in the
message set we read
+ * @param startOffset The logical log offset we are trying to read
* @param maxSize The maximum number of bytes to include in the message
set we read
* @param maxPositionOpt The maximum position in the log segment that
should be exposed for read
* @param minOneMessage If this is true, the first message will be
returned even if it exceeds `maxSize` (if one exists)
*
- * @return The fetched data and the offset metadata of the first message
whose offset is >= startOffset,
+ * @return The fetched data and the base offset metadata of the message
batch that contains startOffset,
* or null if the startOffset is larger than the largest offset in
this log
*/
public FetchDataInfo read(long startOffset, int maxSize, Optional<Long>
maxPositionOpt, boolean minOneMessage) throws IOException {
@@ -433,7 +433,7 @@ public class LogSegment implements Closeable {
return null;
int startPosition = startOffsetAndSize.position;
- LogOffsetMetadata offsetMetadata = new LogOffsetMetadata(startOffset,
this.baseOffset, startPosition);
+ LogOffsetMetadata offsetMetadata = new
LogOffsetMetadata(startOffsetAndSize.offset, this.baseOffset, startPosition);
int adjustedMaxSize = maxSize;
if (minOneMessage)
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index 695c19d4208..616671a6549 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -93,7 +93,7 @@ public class LogSegmentTest {
}
/* Create a ByteBufferMessageSet for the given messages starting from the
given offset */
- private MemoryRecords records(long offset, String... records) {
+ private MemoryRecords v1Records(long offset, String... records) {
List<SimpleRecord> simpleRecords = new ArrayList<>();
for (String s : records) {
simpleRecords.add(new SimpleRecord(offset * 10, s.getBytes()));
@@ -103,6 +103,16 @@ public class LogSegmentTest {
Compression.NONE, TimestampType.CREATE_TIME,
simpleRecords.toArray(new SimpleRecord[0]));
}
+ private MemoryRecords v2Records(long offset, String... records) {
+ List<SimpleRecord> simpleRecords = new ArrayList<>();
+ for (String s : records) {
+ simpleRecords.add(new SimpleRecord(offset * 10, s.getBytes()));
+ }
+ return MemoryRecords.withRecords(
+ RecordBatch.MAGIC_VALUE_V2, offset,
+ Compression.NONE, TimestampType.CREATE_TIME,
simpleRecords.toArray(new SimpleRecord[0]));
+ }
+
@BeforeEach
public void setup() {
logDir = TestUtils.tempDirectory();
@@ -134,7 +144,7 @@ public class LogSegmentTest {
public void testAppendForLogSegmentOffsetOverflowException(long
baseOffset, long largestOffset) throws IOException {
try (LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM)) {
long currentTime = Time.SYSTEM.milliseconds();
- MemoryRecords memoryRecords = records(0, "hello");
+ MemoryRecords memoryRecords = v1Records(0, "hello");
assertThrows(LogSegmentOffsetOverflowException.class, () ->
seg.append(largestOffset, currentTime, largestOffset, memoryRecords));
}
}
@@ -157,20 +167,35 @@ public class LogSegmentTest {
@Test
public void testReadBeforeFirstOffset() throws IOException {
try (LogSegment seg = createSegment(40)) {
- MemoryRecords ms = records(50, "hello", "there", "little", "bee");
+ MemoryRecords ms = v1Records(50, "hello", "there", "little",
"bee");
seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms);
Records read = seg.read(41, 300).records;
checkEquals(ms.records().iterator(), read.records().iterator());
}
}
+ /**
+ * Reading from an offset in the middle of a batch should return a
+ * LogOffsetMetadata offset that points to the batch's base offset
+ */
+ @Test
+ public void testReadFromMiddleOfBatch() throws IOException {
+ long batchBaseOffset = 50;
+ try (LogSegment seg = createSegment(40)) {
+ MemoryRecords ms = v2Records(batchBaseOffset, "hello", "there",
"little", "bee");
+ seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms);
+ FetchDataInfo readInfo = seg.read(52, 300);
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ }
+ }
+
/**
* If we read from an offset beyond the last offset in the segment we
should get null
*/
@Test
public void testReadAfterLast() throws IOException {
try (LogSegment seg = createSegment(40)) {
- MemoryRecords ms = records(50, "hello", "there");
+ MemoryRecords ms = v1Records(50, "hello", "there");
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
FetchDataInfo read = seg.read(52, 200);
assertNull(read, "Read beyond the last offset in the segment
should give null");
@@ -184,9 +209,9 @@ public class LogSegmentTest {
@Test
public void testReadFromGap() throws IOException {
try (LogSegment seg = createSegment(40)) {
- MemoryRecords ms = records(50, "hello", "there");
+ MemoryRecords ms = v1Records(50, "hello", "there");
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
- MemoryRecords ms2 = records(60, "alpha", "beta");
+ MemoryRecords ms2 = v1Records(60, "alpha", "beta");
seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
FetchDataInfo read = seg.read(55, 200);
checkEquals(ms2.records().iterator(),
read.records.records().iterator());
@@ -199,16 +224,11 @@ public class LogSegmentTest {
Optional<Long> maxPosition = Optional.empty();
int maxSize = 1;
try (LogSegment seg = createSegment(40)) {
- MemoryRecords ms = records(50, "hello", "there");
+ MemoryRecords ms = v1Records(50, "hello", "there");
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
- // read before first offset
- FetchDataInfo read = seg.read(48, maxSize, maxPosition,
minOneMessage);
- assertEquals(new LogOffsetMetadata(48, 40, 0),
read.fetchOffsetMetadata);
- assertFalse(read.records.records().iterator().hasNext());
-
// read at first offset
- read = seg.read(50, maxSize, maxPosition, minOneMessage);
+ FetchDataInfo read = seg.read(50, maxSize, maxPosition,
minOneMessage);
assertEquals(new LogOffsetMetadata(50, 40, 0),
read.fetchOffsetMetadata);
assertFalse(read.records.records().iterator().hasNext());
@@ -236,9 +256,9 @@ public class LogSegmentTest {
try (LogSegment seg = createSegment(40)) {
long offset = 40;
for (int i = 0; i < 30; i++) {
- MemoryRecords ms1 = records(offset, "hello");
+ MemoryRecords ms1 = v1Records(offset, "hello");
seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1);
- MemoryRecords ms2 = records(offset + 1, "hello");
+ MemoryRecords ms2 = v1Records(offset + 1, "hello");
seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2);
// check that we can read back both messages
@@ -297,10 +317,10 @@ public class LogSegmentTest {
@Test
public void testReloadLargestTimestampAndNextOffsetAfterTruncation()
throws IOException {
int numMessages = 30;
- try (LogSegment seg = createSegment(40, 2 * records(0,
"hello").sizeInBytes() - 1)) {
+ try (LogSegment seg = createSegment(40, 2 * v1Records(0,
"hello").sizeInBytes() - 1)) {
int offset = 40;
for (int i = 0; i < numMessages; i++) {
- seg.append(offset, offset, offset, records(offset, "hello"));
+ seg.append(offset, offset, offset, v1Records(offset, "hello"));
offset++;
}
assertEquals(offset, seg.readNextOffset());
@@ -323,7 +343,7 @@ public class LogSegmentTest {
MockTime time = new MockTime();
try (LogSegment seg = createSegment(40, time)) {
- seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello",
"there"));
+ seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40,
"hello", "there"));
// If the segment is empty after truncation, the create time
should be reset
time.sleep(500);
@@ -335,7 +355,7 @@ public class LogSegmentTest {
assertFalse(seg.offsetIndex().isFull());
assertNull(seg.read(0, 1024), "Segment should be empty.");
- seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello",
"there"));
+ seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40,
"hello", "there"));
}
}
@@ -344,11 +364,11 @@ public class LogSegmentTest {
*/
@Test
public void testFindOffsetByTimestamp() throws IOException {
- int messageSize = records(0, "msg00").sizeInBytes();
+ int messageSize = v1Records(0, "msg00").sizeInBytes();
try (LogSegment seg = createSegment(40, messageSize * 2 - 1)) {
// Produce some messages
for (int i = 40; i < 50; i++) {
- seg.append(i, i * 10, i, records(i, "msg" + i));
+ seg.append(i, i * 10, i, v1Records(i, "msg" + i));
}
assertEquals(490, seg.largestTimestamp());
@@ -374,7 +394,7 @@ public class LogSegmentTest {
public void testNextOffsetCalculation() throws IOException {
try (LogSegment seg = createSegment(40)) {
assertEquals(40, seg.readNextOffset());
- seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello",
"there", "you"));
+ seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, v1Records(50,
"hello", "there", "you"));
assertEquals(53, seg.readNextOffset());
}
}
@@ -417,7 +437,7 @@ public class LogSegmentTest {
public void testRecoveryFixesCorruptIndex() throws Exception {
try (LogSegment seg = createSegment(0)) {
for (int i = 0; i < 100; i++) {
- seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i,
Integer.toString(i)));
+ seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, v1Records(i,
Integer.toString(i)));
}
File indexFile = seg.offsetIndexFile();
writeNonsenseToFile(indexFile, 5, (int) indexFile.length());
@@ -547,7 +567,7 @@ public class LogSegmentTest {
public void testRecoveryFixesCorruptTimeIndex() throws IOException {
try (LogSegment seg = createSegment(0)) {
for (int i = 0; i < 100; i++) {
- seg.append(i, i * 10, i, records(i, String.valueOf(i)));
+ seg.append(i, i * 10, i, v1Records(i, String.valueOf(i)));
}
File timeIndexFile = seg.timeIndexFile();
writeNonsenseToFile(timeIndexFile, 5, (int)
timeIndexFile.length());
@@ -570,7 +590,7 @@ public class LogSegmentTest {
for (int ignore = 0; ignore < 10; ignore++) {
try (LogSegment seg = createSegment(0)) {
for (int i = 0; i < messagesAppended; i++) {
- seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i,
String.valueOf(i)));
+ seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, v1Records(i,
String.valueOf(i)));
}
int offsetToBeginCorruption =
TestUtils.RANDOM.nextInt(messagesAppended);
// start corrupting somewhere in the middle of the chosen
record all the way to the end
@@ -606,9 +626,9 @@ public class LogSegmentTest {
try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig,
Time.SYSTEM, false,
512 * 1024 * 1024, true, "")) {
segments.add(seg);
- MemoryRecords ms = records(50, "hello", "there");
+ MemoryRecords ms = v1Records(50, "hello", "there");
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
- MemoryRecords ms2 = records(60, "alpha", "beta");
+ MemoryRecords ms2 = v1Records(60, "alpha", "beta");
seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
FetchDataInfo read = seg.read(55, 200);
checkEquals(ms2.records().iterator(),
read.records.records().iterator());
@@ -629,9 +649,9 @@ public class LogSegmentTest {
LogConfig logConfig = new LogConfig(configMap);
try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig,
Time.SYSTEM, 512 * 1024 * 1024, true)) {
- MemoryRecords ms = records(50, "hello", "there");
+ MemoryRecords ms = v1Records(50, "hello", "there");
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
- MemoryRecords ms2 = records(60, "alpha", "beta");
+ MemoryRecords ms2 = v1Records(60, "alpha", "beta");
seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
FetchDataInfo read = seg.read(55, 200);
checkEquals(ms2.records().iterator(),
read.records.records().iterator());
@@ -685,7 +705,7 @@ public class LogSegmentTest {
}
}
- private MemoryRecords records(long offset, int size) {
+ private MemoryRecords v2RecordWithSize(long offset, int size) {
return MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, offset,
Compression.NONE, TimestampType.CREATE_TIME,
new SimpleRecord(new byte[size]));
}
@@ -697,10 +717,10 @@ public class LogSegmentTest {
FileRecords fileRecords =
FileRecords.open(LogFileUtils.logFile(tempDir, 0));
// Simulate a scenario with log offset range exceeding
Integer.MAX_VALUE
- fileRecords.append(records(0, 1024));
- fileRecords.append(records(500, 1024 * 1024 + 1));
+ fileRecords.append(v2RecordWithSize(0, 1024));
+ fileRecords.append(v2RecordWithSize(500, 1024 * 1024 + 1));
long sizeBeforeOverflow = fileRecords.sizeInBytes();
- fileRecords.append(records(Integer.MAX_VALUE + 5L, 1024));
+ fileRecords.append(v2RecordWithSize(Integer.MAX_VALUE + 5L, 1024));
long sizeAfterOverflow = fileRecords.sizeInBytes();
try (LogSegment segment = createSegment(0)) {