This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e7080f295b KAFKA-17512 Move LogSegmentTest to storage module  (#17174)
3e7080f295b is described below

commit 3e7080f295b831efbe8accc6892f7664fc9f89ee
Author: xijiu <[email protected]>
AuthorDate: Wed Sep 25 01:11:31 2024 +0800

    KAFKA-17512 Move LogSegmentTest to storage module  (#17174)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../test/scala/unit/kafka/log/LogSegmentTest.scala | 695 ------------------
 .../storage/internals/log/LogSegmentTest.java      | 789 +++++++++++++++++++++
 2 files changed, 789 insertions(+), 695 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
deleted file mode 100644
index 0355056e86b..00000000000
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ /dev/null
@@ -1,695 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.log
-
-import kafka.utils.TestUtils
-import kafka.utils.TestUtils.random
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{MockTime, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.util.MockScheduler
-import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
-import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log._
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
-import org.mockito.Mockito.{doReturn, mock}
-
-import java.io.{File, RandomAccessFile}
-import java.util.{Optional, OptionalLong}
-import scala.collection._
-import scala.jdk.CollectionConverters._
-
-class LogSegmentTest {
-  private val topicPartition = new TopicPartition("topic", 0)
-  private val segments = mutable.ArrayBuffer[LogSegment]()
-  private var logDir: File = _
-
-  /* create a segment with the given base offset */
-  def createSegment(offset: Long,
-                    indexIntervalBytes: Int = 10,
-                    time: Time = Time.SYSTEM): LogSegment = {
-    val seg = LogTestUtils.createSegment(offset, logDir, indexIntervalBytes, 
time)
-    segments += seg
-    seg
-  }
-
-  /* create a ByteBufferMessageSet for the given messages starting from the 
given offset */
-  def records(offset: Long, records: String*): MemoryRecords = {
-    MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, offset, 
Compression.NONE, TimestampType.CREATE_TIME,
-      records.map { s => new SimpleRecord(offset * 10, s.getBytes) }: _*)
-  }
-
-  @BeforeEach
-  def setup(): Unit = {
-    logDir = TestUtils.tempDir()
-  }
-
-  @AfterEach
-  def teardown(): Unit = {
-    segments.foreach(_.close())
-    Utils.delete(logDir)
-  }
-
-  /**
-   * LogSegmentOffsetOverflowException should be thrown while appending the 
logs if:
-   * 1. largestOffset - baseOffset < 0
-   * 2. largestOffset - baseOffset > Integer.MAX_VALUE
-   */
-  @ParameterizedTest
-  @CsvSource(Array(
-    "0, -2147483648",
-    "0, 2147483648",
-    "1, 0",
-    "100, 10",
-    "2147483648, 0",
-    "-2147483648, 0",
-    "2147483648,4294967296"
-  ))
-  def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, 
largestOffset: Long): Unit = {
-    val seg = createSegment(baseOffset)
-    val currentTime = Time.SYSTEM.milliseconds()
-    val shallowOffsetOfMaxTimestamp = largestOffset
-    val memoryRecords = records(0, "hello")
-    assertThrows(classOf[LogSegmentOffsetOverflowException], () => {
-      seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, 
memoryRecords)
-    })
-  }
-
-  /**
-   * A read on an empty log segment should return null
-   */
-  @Test
-  def testReadOnEmptySegment(): Unit = {
-    val seg = createSegment(40)
-    val read = seg.read(40, 300)
-    assertNull(read, "Read beyond the last offset in the segment should be 
null")
-  }
-
-  /**
-   * Reading from before the first offset in the segment should return messages
-   * beginning with the first message in the segment
-   */
-  @Test
-  def testReadBeforeFirstOffset(): Unit = {
-    val seg = createSegment(40)
-    val ms = records(50, "hello", "there", "little", "bee")
-    seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms)
-    val read = seg.read(41, 300).records
-    checkEquals(ms.records.iterator, read.records.iterator)
-  }
-
-  /**
-   * If we read from an offset beyond the last offset in the segment we should 
get null
-   */
-  @Test
-  def testReadAfterLast(): Unit = {
-    val seg = createSegment(40)
-    val ms = records(50, "hello", "there")
-    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
-    val read = seg.read(52, 200)
-    assertNull(read, "Read beyond the last offset in the segment should give 
null")
-  }
-
-  /**
-   * If we read from an offset which doesn't exist we should get a message set 
beginning
-   * with the least offset greater than the given startOffset.
-   */
-  @Test
-  def testReadFromGap(): Unit = {
-    val seg = createSegment(40)
-    val ms = records(50, "hello", "there")
-    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
-    val ms2 = records(60, "alpha", "beta")
-    seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
-    val read = seg.read(55, 200)
-    checkEquals(ms2.records.iterator, read.records.records.iterator)
-  }
-
-  @ParameterizedTest(name = "testReadWhenNoMaxPosition minOneMessage = {0}")
-  @ValueSource(booleans = Array(true, false))
-  def testReadWhenNoMaxPosition(minOneMessage: Boolean): Unit = {
-    val maxPosition: Optional[java.lang.Long] = Optional.empty()
-    val maxSize = 1
-    val seg = createSegment(40)
-    val ms = records(50, "hello", "there")
-    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
-    // read before first offset
-    var read = seg.read(48, maxSize, maxPosition, minOneMessage)
-    assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata)
-    assertTrue(read.records.records().iterator().asScala.isEmpty)
-    // read at first offset
-    read = seg.read(50, maxSize, maxPosition, minOneMessage)
-    assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata)
-    assertTrue(read.records.records().iterator().asScala.isEmpty)
-    // read at last offset
-    read = seg.read(51, maxSize, maxPosition, minOneMessage)
-    assertEquals(new LogOffsetMetadata(51, 40, 39), read.fetchOffsetMetadata)
-    assertTrue(read.records.records().iterator().asScala.isEmpty)
-    // read at log-end-offset
-    read = seg.read(52, maxSize, maxPosition, minOneMessage)
-    assertNull(read)
-    // read beyond log-end-offset
-    read = seg.read(53, maxSize, maxPosition, minOneMessage)
-    assertNull(read)
-  }
-
-  /**
-   * In a loop append two messages then truncate off the second of those 
messages and check that we can read
-   * the first but not the second message.
-   */
-  @Test
-  def testTruncate(): Unit = {
-    val seg = createSegment(40)
-    var offset = 40
-    for (_ <- 0 until 30) {
-      val ms1 = records(offset, "hello")
-      seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
-      val ms2 = records(offset + 1, "hello")
-      seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2)
-      // check that we can read back both messages
-      val read = seg.read(offset, 10000)
-      assertEquals(List(ms1.records.iterator.next(), 
ms2.records.iterator.next()), read.records.records.asScala.toList)
-      // now truncate off the last message
-      seg.truncateTo(offset + 1)
-      val read2 = seg.read(offset, 10000)
-      assertEquals(1, read2.records.records.asScala.size)
-      checkEquals(ms1.records.iterator, read2.records.records.iterator)
-      offset += 1
-    }
-  }
-
-  @Test
-  def testTruncateEmptySegment(): Unit = {
-    // This tests the scenario in which the follower truncates to an empty 
segment. In this
-    // case we must ensure that the index is resized so that the log segment 
is not mistakenly
-    // rolled due to a full index
-
-    val maxSegmentMs = 300000
-    val time = new MockTime
-    val seg = createSegment(0, time = time)
-    // Force load indexes before closing the segment
-    seg.timeIndex
-    seg.offsetIndex
-    seg.close()
-
-    val reopened = createSegment(0, time = time)
-    assertEquals(0, seg.timeIndex.sizeInBytes)
-    assertEquals(0, seg.offsetIndex.sizeInBytes)
-
-    time.sleep(500)
-    reopened.truncateTo(57)
-    assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP))
-    assertFalse(reopened.timeIndex.isFull)
-    assertFalse(reopened.offsetIndex.isFull)
-
-    var rollParams = new RollParams(maxSegmentMs, Int.MaxValue, 
RecordBatch.NO_TIMESTAMP, 100L, 1024,
-      time.milliseconds())
-    assertFalse(reopened.shouldRoll(rollParams))
-
-    // The segment should not be rolled even if maxSegmentMs has been exceeded
-    time.sleep(maxSegmentMs + 1)
-    assertEquals(maxSegmentMs + 1, 
reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
-    rollParams = new RollParams(maxSegmentMs, Int.MaxValue, 
RecordBatch.NO_TIMESTAMP, 100L, 1024, time.milliseconds())
-    assertFalse(reopened.shouldRoll(rollParams))
-
-    // But we should still roll the segment if we cannot fit the next offset
-    rollParams = new RollParams(maxSegmentMs, Int.MaxValue, 
RecordBatch.NO_TIMESTAMP,
-      Int.MaxValue.toLong + 200L, 1024, time.milliseconds())
-    assertTrue(reopened.shouldRoll(rollParams))
-  }
-
-  @Test
-  def testReloadLargestTimestampAndNextOffsetAfterTruncation(): Unit = {
-    val numMessages = 30
-    val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1)
-    var offset = 40
-    for (_ <- 0 until numMessages) {
-      seg.append(offset, offset, offset, records(offset, "hello"))
-      offset += 1
-    }
-    assertEquals(offset, seg.readNextOffset)
-
-    val expectedNumEntries = numMessages / 2 - 1
-    assertEquals(expectedNumEntries, seg.timeIndex.entries, s"Should have 
$expectedNumEntries time indexes")
-
-    seg.truncateTo(41)
-    assertEquals(0, seg.timeIndex.entries, s"Should have 0 time indexes")
-    assertEquals(400L, seg.largestTimestamp, s"Largest timestamp should be 
400")
-    assertEquals(41, seg.readNextOffset)
-  }
-
-  /**
-   * Test truncating the whole segment, and check that we can reappend with 
the original offset.
-   */
-  @Test
-  def testTruncateFull(): Unit = {
-    // test the case where we fully truncate the log
-    val time = new MockTime
-    val seg = createSegment(40, time = time)
-    seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"))
-
-    // If the segment is empty after truncation, the create time should be 
reset
-    time.sleep(500)
-    assertEquals(500, seg.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP))
-
-    seg.truncateTo(0)
-    assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP))
-    assertFalse(seg.timeIndex.isFull)
-    assertFalse(seg.offsetIndex.isFull)
-    assertNull(seg.read(0, 1024), "Segment should be empty.")
-
-    seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"))
-  }
-
-  /**
-   * Append messages with timestamp and search message by timestamp.
-   */
-  @Test
-  def testFindOffsetByTimestamp(): Unit = {
-    val messageSize = records(0, s"msg00").sizeInBytes
-    val seg = createSegment(40, messageSize * 2 - 1)
-    // Produce some messages
-    for (i <- 40 until 50)
-      seg.append(i, i * 10, i, records(i, s"msg$i"))
-
-    assertEquals(490, seg.largestTimestamp)
-    // Search for an indexed timestamp
-    assertEquals(42, seg.findOffsetByTimestamp(420, 0L).get.offset)
-    assertEquals(43, seg.findOffsetByTimestamp(421, 0L).get.offset)
-    // Search for an un-indexed timestamp
-    assertEquals(43, seg.findOffsetByTimestamp(430, 0L).get.offset)
-    assertEquals(44, seg.findOffsetByTimestamp(431, 0L).get.offset)
-    // Search beyond the last timestamp
-    assertEquals(Optional.empty(), seg.findOffsetByTimestamp(491, 0L))
-    // Search before the first indexed timestamp
-    assertEquals(41, seg.findOffsetByTimestamp(401, 0L).get.offset)
-    // Search before the first timestamp
-    assertEquals(40, seg.findOffsetByTimestamp(399, 0L).get.offset)
-  }
-
-  /**
-   * Test that offsets are assigned sequentially and that the nextOffset 
variable is incremented
-   */
-  @Test
-  def testNextOffsetCalculation(): Unit = {
-    val seg = createSegment(40)
-    assertEquals(40, seg.readNextOffset)
-    seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", 
"there", "you"))
-    assertEquals(53, seg.readNextOffset)
-  }
-
-  /**
-   * Test that we can change the file suffixes for the log and index files
-   */
-  @Test
-  def testChangeFileSuffixes(): Unit = {
-    val seg = createSegment(40)
-    val logFile = seg.log.file
-    val indexFile = seg.offsetIndexFile
-    val timeIndexFile = seg.timeIndexFile
-    // Ensure that files for offset and time indices have not been created 
eagerly.
-    assertFalse(seg.offsetIndexFile.exists)
-    assertFalse(seg.timeIndexFile.exists)
-    seg.changeFileSuffixes("", ".deleted")
-    // Ensure that attempt to change suffixes for non-existing offset and time 
indices does not create new files.
-    assertFalse(seg.offsetIndexFile.exists)
-    assertFalse(seg.timeIndexFile.exists)
-    // Ensure that file names are updated accordingly.
-    assertEquals(logFile.getAbsolutePath + ".deleted", 
seg.log.file.getAbsolutePath)
-    assertEquals(indexFile.getAbsolutePath + ".deleted", 
seg.offsetIndexFile.getAbsolutePath)
-    assertEquals(timeIndexFile.getAbsolutePath + ".deleted", 
seg.timeIndexFile.getAbsolutePath)
-    assertTrue(seg.log.file.exists)
-    // Ensure lazy creation of offset index file upon accessing it.
-    seg.offsetIndex()
-    assertTrue(seg.offsetIndexFile.exists)
-    // Ensure lazy creation of time index file upon accessing it.
-    seg.timeIndex()
-    assertTrue(seg.timeIndexFile.exists)
-  }
-
-  /**
-   * Create a segment with some data and an index. Then corrupt the index,
-   * and recover the segment, the entries should all be readable.
-   */
-  @Test
-  def testRecoveryFixesCorruptIndex(): Unit = {
-    val seg = createSegment(0)
-    for (i <- 0 until 100)
-      seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
-    val indexFile = seg.offsetIndexFile
-    writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
-    seg.recover(newProducerStateManager(), Optional.empty())
-    for (i <- 0 until 100) {
-      val records = seg.read(i, 1, Optional.of(seg.size()), 
true).records.records
-      assertEquals(i, records.iterator.next().offset)
-    }
-  }
-
-  @Test
-  def testRecoverTransactionIndex(): Unit = {
-    val segment = createSegment(100)
-    val producerEpoch = 0.toShort
-    val partitionLeaderEpoch = 15
-    val sequence = 100
-
-    val pid1 = 5L
-    val pid2 = 10L
-
-    // append transactional records from pid1
-    segment.append(101L, RecordBatch.NO_TIMESTAMP,
-      100L, MemoryRecords.withTransactionalRecords(100L, Compression.NONE,
-        pid1, producerEpoch, sequence, partitionLeaderEpoch, new 
SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
-
-    // append transactional records from pid2
-    segment.append(103L, RecordBatch.NO_TIMESTAMP, 102L, 
MemoryRecords.withTransactionalRecords(102L, Compression.NONE,
-        pid2, producerEpoch, sequence, partitionLeaderEpoch, new 
SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
-
-    // append non-transactional records
-    segment.append(105L, RecordBatch.NO_TIMESTAMP, 104L, 
MemoryRecords.withRecords(104L, Compression.NONE,
-        partitionLeaderEpoch, new SimpleRecord("a".getBytes), new 
SimpleRecord("b".getBytes)))
-
-    // abort the transaction from pid2 (note LSO should be 100L since the txn 
from pid1 has not completed)
-    segment.append(106L, RecordBatch.NO_TIMESTAMP, 106L,
-      endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 
106L))
-
-    // commit the transaction from pid1
-    segment.append(107L, RecordBatch.NO_TIMESTAMP, 107L,
-      endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 
107L))
-
-    var stateManager = newProducerStateManager()
-    segment.recover(stateManager, Optional.empty())
-    assertEquals(108L, stateManager.mapEndOffset)
-
-
-    var abortedTxns = segment.txnIndex.allAbortedTxns
-    assertEquals(1, abortedTxns.size)
-    var abortedTxn = abortedTxns.get(0)
-    assertEquals(pid2, abortedTxn.producerId)
-    assertEquals(102L, abortedTxn.firstOffset)
-    assertEquals(106L, abortedTxn.lastOffset)
-    assertEquals(100L, abortedTxn.lastStableOffset)
-
-    // recover again, but this time assuming the transaction from pid2 began 
on a previous segment
-    stateManager = newProducerStateManager()
-    stateManager.loadProducerEntry(new ProducerStateEntry(pid2, producerEpoch, 
0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L), java.util.Optional.of(new 
BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP))))
-    segment.recover(stateManager, Optional.empty())
-    assertEquals(108L, stateManager.mapEndOffset)
-
-    abortedTxns = segment.txnIndex.allAbortedTxns
-    assertEquals(1, abortedTxns.size)
-    abortedTxn = abortedTxns.get(0)
-    assertEquals(pid2, abortedTxn.producerId)
-    assertEquals(75L, abortedTxn.firstOffset)
-    assertEquals(106L, abortedTxn.lastOffset)
-    assertEquals(100L, abortedTxn.lastStableOffset)
-  }
-
-  /**
-   * Create a segment with some data, then recover the segment.
-   * The epoch cache entries should reflect the segment.
-   */
-  @Test
-  def testRecoveryRebuildsEpochCache(): Unit = {
-    val seg = createSegment(0)
-
-    val checkpoint: LeaderEpochCheckpointFile = new 
LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1))
-
-    val cache = new LeaderEpochFileCache(topicPartition, checkpoint, new 
MockScheduler(new MockTime()))
-    seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, 
MemoryRecords.withRecords(104L, Compression.NONE, 0,
-        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
-
-    seg.append(107L, RecordBatch.NO_TIMESTAMP, 106L, 
MemoryRecords.withRecords(106L, Compression.NONE, 1,
-        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
-
-    seg.append(109L, RecordBatch.NO_TIMESTAMP, 108L, 
MemoryRecords.withRecords(108L, Compression.NONE, 1,
-        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
-
-    seg.append(111L, RecordBatch.NO_TIMESTAMP, 110, 
MemoryRecords.withRecords(110L, Compression.NONE, 2,
-        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
-
-    seg.recover(newProducerStateManager(), Optional.of(cache))
-    assertEquals(java.util.Arrays.asList(new EpochEntry(0, 104L),
-                             new EpochEntry(1, 106),
-                             new EpochEntry(2, 110)),
-      cache.epochEntries)
-  }
-
-  private def endTxnRecords(controlRecordType: ControlRecordType,
-                            producerId: Long,
-                            producerEpoch: Short,
-                            offset: Long,
-                            partitionLeaderEpoch: Int = 0,
-                            coordinatorEpoch: Int = 0,
-                            timestamp: Long = RecordBatch.NO_TIMESTAMP): 
MemoryRecords = {
-    val marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch)
-    MemoryRecords.withEndTransactionMarker(offset, timestamp, 
partitionLeaderEpoch, producerId, producerEpoch, marker)
-  }
-
-  /**
-   * Create a segment with some data and an index. Then corrupt the index,
-   * and recover the segment, the entries should all be readable.
-   */
-  @Test
-  def testRecoveryFixesCorruptTimeIndex(): Unit = {
-    val seg = createSegment(0)
-    for (i <- 0 until 100)
-      seg.append(i, i * 10, i, records(i, i.toString))
-    val timeIndexFile = seg.timeIndexFile
-    writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
-    seg.recover(newProducerStateManager(), Optional.empty())
-    for (i <- 0 until 100) {
-      assertEquals(i, seg.findOffsetByTimestamp(i * 10, 0L).get.offset)
-      if (i < 99)
-        assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1, 
0L).get.offset)
-    }
-  }
-
-  /**
-   * Randomly corrupt a log a number of times and attempt recovery.
-   */
-  @Test
-  def testRecoveryWithCorruptMessage(): Unit = {
-    val messagesAppended = 20
-    for (_ <- 0 until 10) {
-      val seg = createSegment(0)
-      for (i <- 0 until messagesAppended)
-        seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
-      val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
-      // start corrupting somewhere in the middle of the chosen record all the 
way to the end
-
-      val recordPosition = 
seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)
-      val position = recordPosition.position + TestUtils.random.nextInt(15)
-      writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - 
position).toInt)
-      seg.recover(newProducerStateManager(), Optional.empty())
-      assertEquals((0 until offsetToBeginCorruption).toList, 
seg.log.batches.asScala.map(_.lastOffset).toList,
-        "Should have truncated off bad messages.")
-      seg.deleteIfExists()
-    }
-  }
-
-  private def createSegment(baseOffset: Long, fileAlreadyExists: Boolean, 
initFileSize: Int, preallocate: Boolean): LogSegment = {
-    val tempDir = TestUtils.tempDir()
-    val logConfig = new LogConfig(Map(
-      TopicConfig.INDEX_INTERVAL_BYTES_CONFIG -> 10,
-      TopicConfig.SEGMENT_INDEX_BYTES_CONFIG -> 1000,
-      TopicConfig.SEGMENT_JITTER_MS_CONFIG -> 0
-    ).asJava)
-    val seg = LogSegment.open(tempDir, baseOffset, logConfig, Time.SYSTEM, 
fileAlreadyExists, initFileSize, preallocate, "")
-    segments += seg
-    seg
-  }
-
-  /* create a segment with   pre allocate, put message to it and verify */
-  @Test
-  def testCreateWithInitFileSizeAppendMessage(): Unit = {
-    val seg = createSegment(40, fileAlreadyExists = false, 512*1024*1024, 
preallocate = true)
-    val ms = records(50, "hello", "there")
-    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
-    val ms2 = records(60, "alpha", "beta")
-    seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
-    val read = seg.read(55, 200)
-    checkEquals(ms2.records.iterator, read.records.records.iterator)
-  }
-
-  /* create a segment with   pre allocate and clearly shut down*/
-  @Test
-  def testCreateWithInitFileSizeClearShutdown(): Unit = {
-    val tempDir = TestUtils.tempDir()
-    val logConfig = new LogConfig(Map(
-      TopicConfig.INDEX_INTERVAL_BYTES_CONFIG -> 10,
-      TopicConfig.SEGMENT_INDEX_BYTES_CONFIG -> 1000,
-      TopicConfig.SEGMENT_JITTER_MS_CONFIG -> 0
-    ).asJava)
-
-    val seg = LogSegment.open(tempDir, 40, logConfig, Time.SYSTEM,
-      512 * 1024 * 1024, true)
-
-    val ms = records(50, "hello", "there")
-    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
-    val ms2 = records(60, "alpha", "beta")
-    seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
-    val read = seg.read(55, 200)
-    checkEquals(ms2.records.iterator, read.records.records.iterator)
-    val oldSize = seg.log.sizeInBytes()
-    val oldPosition = seg.log.channel.position
-    val oldFileSize = seg.log.file.length
-    assertEquals(512*1024*1024, oldFileSize)
-    seg.close()
-    //After close, file should be trimmed
-    assertEquals(oldSize, seg.log.file.length)
-
-    val segReopen = LogSegment.open(tempDir, 40, logConfig, Time.SYSTEM, true, 
512 * 1024 * 1024, true, "")
-    segments += segReopen
-
-    val readAgain = segReopen.read(55, 200)
-    checkEquals(ms2.records.iterator, readAgain.records.records.iterator)
-    val size = segReopen.log.sizeInBytes()
-    val position = segReopen.log.channel.position
-    val fileSize = segReopen.log.file.length
-    assertEquals(oldPosition, position)
-    assertEquals(oldSize, size)
-    assertEquals(size, fileSize)
-  }
-
-  @Test
-  def shouldTruncateEvenIfOffsetPointsToAGapInTheLog(): Unit = {
-    val seg = createSegment(40)
-    val offset = 40
-
-    def records(offset: Long, record: String): MemoryRecords =
-      MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, offset, 
Compression.NONE, TimestampType.CREATE_TIME,
-        new SimpleRecord(offset * 1000, record.getBytes))
-
-    //Given two messages with a gap between them (e.g. mid offset compacted 
away)
-    val ms1 = records(offset, "first message")
-    seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
-    val ms2 = records(offset + 3, "message after gap")
-    seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2)
-
-    // When we truncate to an offset without a corresponding log entry
-    seg.truncateTo(offset + 1)
-
-    //Then we should still truncate the record that was present (i.e. offset + 
3 is gone)
-    val log = seg.read(offset, 10000)
-    assertEquals(offset, log.records.batches.iterator.next().baseOffset())
-    assertEquals(1, log.records.batches.asScala.size)
-  }
-
-  @Test
-  def testAppendFromFile(): Unit = {
-    def records(offset: Long, size: Int): MemoryRecords =
-      MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, offset, 
Compression.NONE, TimestampType.CREATE_TIME,
-        new SimpleRecord(new Array[Byte](size)))
-
-    // create a log file in a separate directory to avoid conflicting with 
created segments
-    val tempDir = TestUtils.tempDir()
-    val fileRecords = FileRecords.open(LogFileUtils.logFile(tempDir, 0))
-
-    // Simulate a scenario where we have a single log with an offset range 
exceeding Int.MaxValue
-    fileRecords.append(records(0, 1024))
-    fileRecords.append(records(500, 1024 * 1024 + 1))
-    val sizeBeforeOverflow = fileRecords.sizeInBytes()
-    fileRecords.append(records(Int.MaxValue + 5L, 1024))
-    val sizeAfterOverflow = fileRecords.sizeInBytes()
-
-    val segment = createSegment(0)
-    val bytesAppended = segment.appendFromFile(fileRecords, 0)
-    assertEquals(sizeBeforeOverflow, bytesAppended)
-    assertEquals(sizeBeforeOverflow, segment.size)
-
-    val overflowSegment = createSegment(Int.MaxValue)
-    val overflowBytesAppended = overflowSegment.appendFromFile(fileRecords, 
sizeBeforeOverflow)
-    assertEquals(sizeAfterOverflow - sizeBeforeOverflow, overflowBytesAppended)
-    assertEquals(overflowBytesAppended, overflowSegment.size)
-
-    Utils.delete(tempDir)
-  }
-
-  @Test
-  def testGetFirstBatchTimestamp(): Unit = {
-    val segment = createSegment(1)
-    assertEquals(Long.MaxValue, segment.getFirstBatchTimestamp)
-
-    segment.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, 
new SimpleRecord("one".getBytes)))
-    assertEquals(1000L, segment.getFirstBatchTimestamp)
-
-    segment.close()
-  }
-
-  @Test
-  def testDeleteIfExistsWithGetParentIsNull(): Unit = {
-    val log = mock(classOf[FileRecords])
-    val lazyOffsetIndex = mock(classOf[LazyIndex[OffsetIndex]])
-    val lazyTimeIndex = mock(classOf[LazyIndex[TimeIndex]])
-    val transactionIndex = mock(classOf[TransactionIndex])
-    val segment = new LogSegment(log, lazyOffsetIndex, lazyTimeIndex, 
transactionIndex, 0, 10, 100, Time.SYSTEM)
-
-    // Use "Nil: _*" as workaround for jdk 8
-    doReturn(true, Nil: _*).when(log).deleteIfExists()
-    doReturn(true, Nil: _*).when(lazyOffsetIndex).deleteIfExists()
-    doReturn(true, Nil: _*).when(lazyTimeIndex).deleteIfExists()
-    doReturn(false, Nil: _*).when(transactionIndex).deleteIfExists()
-
-    val mockFile = mock(classOf[File])
-    doReturn("/test/path", Nil: _*).when(mockFile).getAbsolutePath
-    doReturn(mockFile, Nil: _*).when(log).file()
-    doReturn(mockFile, Nil: _*).when(lazyOffsetIndex).file()
-    doReturn(mockFile, Nil: _*).when(lazyTimeIndex).file()
-
-    val transactionIndexFile = new File("/")
-    doReturn(transactionIndexFile, Nil: _*).when(transactionIndex).file()
-    assertDoesNotThrow(new Executable {
-      override def execute(): Unit = segment.deleteIfExists()
-    }, "Should not throw exception when transactionIndex.deleteIfExists() 
returns false")
-  }
-
-  private def newProducerStateManager(): ProducerStateManager = {
-    new ProducerStateManager(
-      topicPartition,
-      logDir,
-      5 * 60 * 1000,
-      new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-      new MockTime()
-    )
-  }
-
-  private def checkEquals[T](s1: java.util.Iterator[T], s2: 
java.util.Iterator[T]): Unit = {
-    while (s1.hasNext && s2.hasNext)
-      assertEquals(s1.next, s2.next)
-    assertFalse(s1.hasNext, "Iterators have uneven length--first has more")
-    assertFalse(s2.hasNext, "Iterators have uneven length--second has more")
-  }
-
-  private def writeNonsenseToFile(fileName: File, position: Long, size: Int): 
Unit = {
-    val file = new RandomAccessFile(fileName, "rw")
-    try {
-      file.seek(position)
-      for (_ <- 0 until size)
-        file.writeByte(random.nextInt(255))
-    } finally {
-      file.close()
-    }
-  }
-
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
new file mode 100644
index 00000000000..695c19d4208
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -0,0 +1,789 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.util.MockScheduler;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class LogSegmentTest {
+    private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
+    private final List<LogSegment> segments = new ArrayList<>();
+    private File logDir = null;
+
+    /* Create a segment with the given base offset */
+    private LogSegment createSegment(long offset, int indexIntervalBytes, Time 
time) throws IOException {
+        LogSegment seg = LogTestUtils.createSegment(offset, logDir, 
indexIntervalBytes, time);
+        segments.add(seg);
+        return seg;
+    }
+
+    private LogSegment createSegment(long offset) throws IOException {
+        return createSegment(offset, 10, Time.SYSTEM);
+    }
+
+    private LogSegment createSegment(long offset, Time time) throws 
IOException {
+        return createSegment(offset, 10, time);
+    }
+
+    private LogSegment createSegment(long offset, int indexIntervalBytes) 
throws IOException {
+        return createSegment(offset, indexIntervalBytes, Time.SYSTEM);
+    }
+
+    /* Create a ByteBufferMessageSet for the given messages starting from the 
given offset */
+    private MemoryRecords records(long offset, String... records) {
+        List<SimpleRecord> simpleRecords = new ArrayList<>();
+        for (String s : records) {
+            simpleRecords.add(new SimpleRecord(offset * 10, s.getBytes()));
+        }
+        return MemoryRecords.withRecords(
+            RecordBatch.MAGIC_VALUE_V1, offset,
+            Compression.NONE, TimestampType.CREATE_TIME, 
simpleRecords.toArray(new SimpleRecord[0]));
+    }
+
+    @BeforeEach
+    public void setup() {
+        logDir = TestUtils.tempDirectory();
+    }
+
+    @AfterEach
+    public void teardown() throws IOException {
+        for (LogSegment segment : segments) {
+            segment.close();
+        }
+        Utils.delete(logDir);
+    }
+
+    /**
+     * LogSegmentOffsetOverflowException should be thrown while appending the 
logs if:
+     * 1. largestOffset - baseOffset < 0
+     * 2. largestOffset - baseOffset > Integer.MAX_VALUE
+     */
+    @ParameterizedTest
+    @CsvSource({
+        "0, -2147483648",
+        "0, 2147483648",
+        "1, 0",
+        "100, 10",
+        "2147483648, 0",
+        "-2147483648, 0",
+        "2147483648, 4294967296"
+    })
+    public void testAppendForLogSegmentOffsetOverflowException(long 
baseOffset, long largestOffset) throws IOException {
+        try (LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM)) {
+            long currentTime = Time.SYSTEM.milliseconds();
+            MemoryRecords memoryRecords = records(0, "hello");
+            assertThrows(LogSegmentOffsetOverflowException.class, () -> 
seg.append(largestOffset, currentTime, largestOffset, memoryRecords));
+        }
+    }
+
+    /**
+     * A read on an empty log segment should return null
+     */
+    @Test
+    public void testReadOnEmptySegment() throws IOException {
+        try (LogSegment seg = createSegment(40)) {
+            FetchDataInfo read = seg.read(40, 300);
+            assertNull(read, "Read beyond the last offset in the segment 
should be null");
+        }
+    }
+
+    /**
+     * Reading from before the first offset in the segment should return 
messages
+     * beginning with the first message in the segment
+     */
+    @Test
+    public void testReadBeforeFirstOffset() throws IOException {
+        try (LogSegment seg = createSegment(40)) {
+            MemoryRecords ms = records(50, "hello", "there", "little", "bee");
+            seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            Records read = seg.read(41, 300).records;
+            checkEquals(ms.records().iterator(), read.records().iterator());
+        }
+    }
+
+    /**
+     * If we read from an offset beyond the last offset in the segment we 
should get null
+     */
+    @Test
+    public void testReadAfterLast() throws IOException {
+        try (LogSegment seg = createSegment(40)) {
+            MemoryRecords ms = records(50, "hello", "there");
+            seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            FetchDataInfo read = seg.read(52, 200);
+            assertNull(read, "Read beyond the last offset in the segment 
should give null");
+        }
+    }
+
+    /**
+     * If we read from an offset which doesn't exist we should get a message 
set beginning
+     * with the least offset greater than the given startOffset.
+     */
+    @Test
+    public void testReadFromGap() throws IOException {
+        try (LogSegment seg = createSegment(40)) {
+            MemoryRecords ms = records(50, "hello", "there");
+            seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            MemoryRecords ms2 = records(60, "alpha", "beta");
+            seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+            FetchDataInfo read = seg.read(55, 200);
+            checkEquals(ms2.records().iterator(), 
read.records.records().iterator());
+        }
+    }
+
+    @ParameterizedTest(name = "testReadWhenNoMaxPosition minOneMessage = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testReadWhenNoMaxPosition(boolean minOneMessage) throws 
IOException {
+        Optional<Long> maxPosition = Optional.empty();
+        int maxSize = 1;
+        try (LogSegment seg = createSegment(40)) {
+            MemoryRecords ms = records(50, "hello", "there");
+            seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+
+            // read before first offset
+            FetchDataInfo read = seg.read(48, maxSize, maxPosition, 
minOneMessage);
+            assertEquals(new LogOffsetMetadata(48, 40, 0), 
read.fetchOffsetMetadata);
+            assertFalse(read.records.records().iterator().hasNext());
+
+            // read at first offset
+            read = seg.read(50, maxSize, maxPosition, minOneMessage);
+            assertEquals(new LogOffsetMetadata(50, 40, 0), 
read.fetchOffsetMetadata);
+            assertFalse(read.records.records().iterator().hasNext());
+
+            // read at last offset
+            read = seg.read(51, maxSize, maxPosition, minOneMessage);
+            assertEquals(new LogOffsetMetadata(51, 40, 39), 
read.fetchOffsetMetadata);
+            assertFalse(read.records.records().iterator().hasNext());
+
+            // read at log-end-offset
+            read = seg.read(52, maxSize, maxPosition, minOneMessage);
+            assertNull(read);
+
+            // read beyond log-end-offset
+            read = seg.read(53, maxSize, maxPosition, minOneMessage);
+            assertNull(read);
+        }
+    }
+
+    /**
+     * In a loop append two messages then truncate off the second of those 
messages and check that we can read
+     * the first but not the second message.
+     */
+    @Test
+    public void testTruncate() throws IOException {
+        try (LogSegment seg = createSegment(40)) {
+            long offset = 40;
+            for (int i = 0; i < 30; i++) {
+                MemoryRecords ms1 = records(offset, "hello");
+                seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1);
+                MemoryRecords ms2 = records(offset + 1, "hello");
+                seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+
+                // check that we can read back both messages
+                FetchDataInfo read = seg.read(offset, 10000);
+                
assertIterableEquals(Arrays.asList(ms1.records().iterator().next(), 
ms2.records().iterator().next()), read.records.records());
+
+                // Now truncate off the last message
+                seg.truncateTo(offset + 1);
+                FetchDataInfo read2 = seg.read(offset, 10000);
+                assertIterableEquals(ms1.records(), read2.records.records());
+                offset += 1;
+            }
+        }
+    }
+
+    @Test
+    public void testTruncateEmptySegment() throws IOException {
+        // This tests the scenario in which the follower truncates to an empty 
segment. In this
+        // case we must ensure that the index is resized so that the log 
segment is not mistakenly
+        // rolled due to a full index
+
+        long maxSegmentMs = 300000;
+        MockTime time = new MockTime();
+        try (LogSegment seg = createSegment(0L, time)) {
+            seg.timeIndex(); // Force load indexes before closing the segment
+            seg.offsetIndex();
+            seg.close();
+
+            LogSegment reopened = createSegment(0L, time);
+            assertEquals(0, seg.timeIndex().sizeInBytes());
+            assertEquals(0, seg.offsetIndex().sizeInBytes());
+
+            time.sleep(500);
+            reopened.truncateTo(57);
+            assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP));
+            assertFalse(reopened.timeIndex().isFull());
+            assertFalse(reopened.offsetIndex().isFull());
+
+            RollParams rollParams = new RollParams(maxSegmentMs, 
Integer.MAX_VALUE, RecordBatch.NO_TIMESTAMP,
+                100L, 1024, time.milliseconds());
+            assertFalse(reopened.shouldRoll(rollParams));
+
+            // the segment should not be rolled even if maxSegmentMs has been 
exceeded
+            time.sleep(maxSegmentMs + 1);
+            assertEquals(maxSegmentMs + 1, 
reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP));
+            rollParams = new RollParams(maxSegmentMs, Integer.MAX_VALUE, 
RecordBatch.NO_TIMESTAMP, 100L, 1024, time.milliseconds());
+            assertFalse(reopened.shouldRoll(rollParams));
+
+            // but we should still roll the segment if we cannot fit the next 
offset
+            rollParams = new RollParams(maxSegmentMs, Integer.MAX_VALUE, 
RecordBatch.NO_TIMESTAMP,
+                (long) Integer.MAX_VALUE + 200L, 1024, time.milliseconds());
+            assertTrue(reopened.shouldRoll(rollParams));
+        }
+    }
+
+    @Test
+    public void testReloadLargestTimestampAndNextOffsetAfterTruncation() 
throws IOException {
+        int numMessages = 30;
+        try (LogSegment seg = createSegment(40, 2 * records(0, 
"hello").sizeInBytes() - 1)) {
+            int offset = 40;
+            for (int i = 0; i < numMessages; i++) {
+                seg.append(offset, offset, offset, records(offset, "hello"));
+                offset++;
+            }
+            assertEquals(offset, seg.readNextOffset());
+
+            int expectedNumEntries = numMessages / 2 - 1;
+            assertEquals(expectedNumEntries, seg.timeIndex().entries(), 
String.format("Should have %d time indexes", expectedNumEntries));
+
+            seg.truncateTo(41);
+            assertEquals(0, seg.timeIndex().entries(), "Should have 0 time 
indexes");
+            assertEquals(400L, seg.largestTimestamp(), "Largest timestamp 
should be 400");
+            assertEquals(41, seg.readNextOffset());
+        }
+    }
+
+    /**
+     * Test truncating the whole segment, and check that we can reappend with 
the original offset.
+     */
+    @Test
+    public void testTruncateFull() throws IOException {
+        MockTime time = new MockTime();
+        try (LogSegment seg = createSegment(40, time)) {
+
+            seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"));
+
+            // If the segment is empty after truncation, the create time 
should be reset
+            time.sleep(500);
+            assertEquals(500, seg.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP));
+
+            seg.truncateTo(0);
+            assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP));
+            assertFalse(seg.timeIndex().isFull());
+            assertFalse(seg.offsetIndex().isFull());
+            assertNull(seg.read(0, 1024), "Segment should be empty.");
+
+            seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", 
"there"));
+        }
+    }
+
+    /**
+     * Append messages with timestamp and search message by timestamp.
+     */
+    @Test
+    public void testFindOffsetByTimestamp() throws IOException {
+        int messageSize = records(0, "msg00").sizeInBytes();
+        try (LogSegment seg = createSegment(40, messageSize * 2 - 1)) {
+            // Produce some messages
+            for (int i = 40; i < 50; i++) {
+                seg.append(i, i * 10, i, records(i, "msg" + i));
+            }
+
+            assertEquals(490, seg.largestTimestamp());
+            // Search for an indexed timestamp
+            assertEquals(42, seg.findOffsetByTimestamp(420, 0L).get().offset);
+            assertEquals(43, seg.findOffsetByTimestamp(421, 0L).get().offset);
+            // Search for an un-indexed timestamp
+            assertEquals(43, seg.findOffsetByTimestamp(430, 0L).get().offset);
+            assertEquals(44, seg.findOffsetByTimestamp(431, 0L).get().offset);
+            // Search beyond the last timestamp
+            assertEquals(Optional.empty(), seg.findOffsetByTimestamp(491, 0L));
+            // Search before the first indexed timestamp
+            assertEquals(41, seg.findOffsetByTimestamp(401, 0L).get().offset);
+            // Search before the first timestamp
+            assertEquals(40, seg.findOffsetByTimestamp(399, 0L).get().offset);
+        }
+    }
+
+    /**
+     * Test that offsets are assigned sequentially and that the nextOffset 
variable is incremented
+     */
+    @Test
+    public void testNextOffsetCalculation() throws IOException {
+        try (LogSegment seg = createSegment(40)) {
+            assertEquals(40, seg.readNextOffset());
+            seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", 
"there", "you"));
+            assertEquals(53, seg.readNextOffset());
+        }
+    }
+
+    /**
+     * Test that we can change the file suffixes for the log and index files
+     */
+    @Test
+    public void testChangeFileSuffixes() throws IOException {
+        try (LogSegment seg = createSegment(40)) {
+            File logFile = seg.log().file();
+            File indexFile = seg.offsetIndexFile();
+            File timeIndexFile = seg.timeIndexFile();
+            // Ensure that files for offset and time indices have not been 
created eagerly.
+            assertFalse(seg.offsetIndexFile().exists());
+            assertFalse(seg.timeIndexFile().exists());
+            seg.changeFileSuffixes("", ".deleted");
+            // Ensure that attempt to change suffixes for non-existing offset 
and time indices does not create new files.
+            assertFalse(seg.offsetIndexFile().exists());
+            assertFalse(seg.timeIndexFile().exists());
+            // Ensure that file names are updated accordingly.
+            assertEquals(logFile.getAbsolutePath() + ".deleted", 
seg.log().file().getAbsolutePath());
+            assertEquals(indexFile.getAbsolutePath() + ".deleted", 
seg.offsetIndexFile().getAbsolutePath());
+            assertEquals(timeIndexFile.getAbsolutePath() + ".deleted", 
seg.timeIndexFile().getAbsolutePath());
+            assertTrue(seg.log().file().exists());
+            // Ensure lazy creation of offset index file upon accessing it.
+            seg.offsetIndex();
+            assertTrue(seg.offsetIndexFile().exists());
+            // Ensure lazy creation of time index file upon accessing it.
+            seg.timeIndex();
+            assertTrue(seg.timeIndexFile().exists());
+        }
+    }
+
+    /**
+     * Create a segment with some data and an index. Then corrupt the index,
+     * and recover the segment, the entries should all be readable.
+     */
+    @Test
+    public void testRecoveryFixesCorruptIndex() throws Exception {
+        try (LogSegment seg = createSegment(0)) {
+            for (int i = 0; i < 100; i++) {
+                seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, 
Integer.toString(i)));
+            }
+            File indexFile = seg.offsetIndexFile();
+            writeNonsenseToFile(indexFile, 5, (int) indexFile.length());
+            seg.recover(newProducerStateManager(), Optional.empty());
+            for (int i = 0; i < 100; i++) {
+                Iterable<Record> records = seg.read(i, 1, Optional.of((long) 
seg.size()), true).records.records();
+                assertEquals(i, records.iterator().next().offset());
+            }
+        }
+    }
+
+    @Test
+    public void testRecoverTransactionIndex() throws Exception {
+        try (LogSegment segment = createSegment(100)) {
+            short producerEpoch = 0;
+            int partitionLeaderEpoch = 15;
+            int sequence = 100;
+
+            long pid1 = 5L;
+            long pid2 = 10L;
+
+            // append transactional records from pid1
+            segment.append(101L, RecordBatch.NO_TIMESTAMP,
+                100L, MemoryRecords.withTransactionalRecords(100L, 
Compression.NONE,
+                    pid1, producerEpoch, sequence, partitionLeaderEpoch, new 
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())));
+
+            // append transactional records from pid2
+            segment.append(103L, RecordBatch.NO_TIMESTAMP,
+                102L, MemoryRecords.withTransactionalRecords(102L, 
Compression.NONE,
+                    pid2, producerEpoch, sequence, partitionLeaderEpoch, new 
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())));
+
+            // append non-transactional records
+            segment.append(105L, RecordBatch.NO_TIMESTAMP,
+                104L, MemoryRecords.withRecords(104L, Compression.NONE,
+                    partitionLeaderEpoch, new SimpleRecord("a".getBytes()), 
new SimpleRecord("b".getBytes())));
+
+            // abort the transaction from pid2
+            segment.append(106L, RecordBatch.NO_TIMESTAMP,
+                106L, endTxnRecords(ControlRecordType.ABORT, pid2, 
producerEpoch, 106L));
+
+            // commit the transaction from pid1
+            segment.append(107L, RecordBatch.NO_TIMESTAMP,
+                107L, endTxnRecords(ControlRecordType.COMMIT, pid1, 
producerEpoch, 107L));
+
+            ProducerStateManager stateManager = newProducerStateManager();
+            segment.recover(stateManager, Optional.empty());
+            assertEquals(108L, stateManager.mapEndOffset());
+
+            List<AbortedTxn> abortedTxns = segment.txnIndex().allAbortedTxns();
+            assertEquals(1, abortedTxns.size());
+            AbortedTxn abortedTxn = abortedTxns.get(0);
+            assertEquals(pid2, abortedTxn.producerId());
+            assertEquals(102L, abortedTxn.firstOffset());
+            assertEquals(106L, abortedTxn.lastOffset());
+            assertEquals(100L, abortedTxn.lastStableOffset());
+
+            // recover again, assuming the transaction from pid2 began on a 
previous segment
+            stateManager = newProducerStateManager();
+            stateManager.loadProducerEntry(new ProducerStateEntry(pid2, 
producerEpoch, 0,
+                RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L),
+                Optional.of(new BatchMetadata(10, 10L, 5, 
RecordBatch.NO_TIMESTAMP))));
+            segment.recover(stateManager, Optional.empty());
+            assertEquals(108L, stateManager.mapEndOffset());
+
+            abortedTxns = segment.txnIndex().allAbortedTxns();
+            assertEquals(1, abortedTxns.size());
+            abortedTxn = abortedTxns.get(0);
+            assertEquals(pid2, abortedTxn.producerId());
+            assertEquals(75L, abortedTxn.firstOffset());
+            assertEquals(106L, abortedTxn.lastOffset());
+            assertEquals(100L, abortedTxn.lastStableOffset());
+        }
+    }
+
+    /**
+     * Create a segment with some data, then recover the segment.
+     * The epoch cache entries should reflect the segment.
+     */
+    @Test
+    public void testRecoveryRebuildsEpochCache() throws Exception {
+        try (LogSegment seg = createSegment(0)) {
+            LeaderEpochCheckpointFile checkpoint = new 
LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1));
+
+            LeaderEpochFileCache cache = new 
LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new 
MockTime()));
+            seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, 
MemoryRecords.withRecords(104L, Compression.NONE, 0,
+                new SimpleRecord("a".getBytes()), new 
SimpleRecord("b".getBytes())));
+
+            seg.append(107L, RecordBatch.NO_TIMESTAMP, 106L, 
MemoryRecords.withRecords(106L, Compression.NONE, 1,
+                new SimpleRecord("a".getBytes()), new 
SimpleRecord("b".getBytes())));
+
+            seg.append(109L, RecordBatch.NO_TIMESTAMP, 108L, 
MemoryRecords.withRecords(108L, Compression.NONE, 1,
+                new SimpleRecord("a".getBytes()), new 
SimpleRecord("b".getBytes())));
+
+            seg.append(111L, RecordBatch.NO_TIMESTAMP, 110L, 
MemoryRecords.withRecords(110L, Compression.NONE, 2,
+                new SimpleRecord("a".getBytes()), new 
SimpleRecord("b".getBytes())));
+
+            seg.recover(newProducerStateManager(), Optional.of(cache));
+            assertEquals(Arrays.asList(
+                new EpochEntry(0, 104L),
+                new EpochEntry(1, 106L),
+                new EpochEntry(2, 110L)), cache.epochEntries());
+        }
+    }
+
+    private MemoryRecords endTxnRecords(
+        ControlRecordType controlRecordType,
+        long producerId,
+        short producerEpoch,
+        long offset) {
+
+        EndTransactionMarker marker = new 
EndTransactionMarker(controlRecordType, 0);
+        return MemoryRecords.withEndTransactionMarker(
+            offset,
+            RecordBatch.NO_TIMESTAMP,
+            0,
+            producerId,
+            producerEpoch,
+            marker
+        );
+    }
+
+    /**
+     * Create a segment with some data and an index. Then corrupt the index,
+     * and recover the segment, the entries should all be readable.
+     */
+    @Test
+    public void testRecoveryFixesCorruptTimeIndex() throws IOException {
+        try (LogSegment seg = createSegment(0)) {
+            for (int i = 0; i < 100; i++) {
+                seg.append(i, i * 10, i, records(i, String.valueOf(i)));
+            }
+            File timeIndexFile = seg.timeIndexFile();
+            writeNonsenseToFile(timeIndexFile, 5, (int) 
timeIndexFile.length());
+            seg.recover(newProducerStateManager(), Optional.empty());
+            for (int i = 0; i < 100; i++) {
+                assertEquals(i, seg.findOffsetByTimestamp(i * 10, 
0L).get().offset);
+                if (i < 99) {
+                    assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1, 
0L).get().offset);
+                }
+            }
+        }
+    }
+
+    /**
+     * Randomly corrupt a log a number of times and attempt recovery.
+     */
+    @Test
+    public void testRecoveryWithCorruptMessage() throws IOException {
+        int messagesAppended = 20;
+        for (int ignore = 0; ignore < 10; ignore++) {
+            try (LogSegment seg = createSegment(0)) {
+                for (int i = 0; i < messagesAppended; i++) {
+                    seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, 
String.valueOf(i)));
+                }
+                int offsetToBeginCorruption = 
TestUtils.RANDOM.nextInt(messagesAppended);
+                // start corrupting somewhere in the middle of the chosen 
record all the way to the end
+
+                FileRecords.LogOffsetPosition recordPosition = 
seg.log().searchForOffsetWithSize(offsetToBeginCorruption, 0);
+                int position = recordPosition.position + 
TestUtils.RANDOM.nextInt(15);
+                writeNonsenseToFile(seg.log().file(), position, (int) 
(seg.log().file().length() - position));
+                seg.recover(newProducerStateManager(), Optional.empty());
+
+                List<Long> expectList = new ArrayList<>();
+                for (long j = 0; j < offsetToBeginCorruption; j++) {
+                    expectList.add(j);
+                }
+                List<Long> actualList = new ArrayList<>();
+                for (FileLogInputStream.FileChannelRecordBatch batch : 
seg.log().batches()) {
+                    actualList.add(batch.lastOffset());
+                }
+                assertEquals(expectList, actualList, "Should have truncated 
off bad messages.");
+                seg.deleteIfExists();
+            }
+        }
+    }
+
+    /* create a segment with   pre allocate, put message to it and verify */
+    @Test
+    public void testCreateWithInitFileSizeAppendMessage() throws IOException {
+        File tempDir = TestUtils.tempDirectory();
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 10);
+        configMap.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000);
+        configMap.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, 0);
+        LogConfig logConfig = new LogConfig(configMap);
+        try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig, 
Time.SYSTEM, false,
+            512 * 1024 * 1024, true, "")) {
+            segments.add(seg);
+            MemoryRecords ms = records(50, "hello", "there");
+            seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            MemoryRecords ms2 = records(60, "alpha", "beta");
+            seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+            FetchDataInfo read = seg.read(55, 200);
+            checkEquals(ms2.records().iterator(), 
read.records.records().iterator());
+        }
+    }
+
+    /* create a segment with   pre allocate and clearly shut down*/
+    @Test
+    public void testCreateWithInitFileSizeClearShutdown() throws IOException {
+        // Create a temporary directory
+        File tempDir = TestUtils.tempDirectory();
+
+        // Set up the log configuration
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 10);
+        configMap.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000);
+        configMap.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, 0);
+        LogConfig logConfig = new LogConfig(configMap);
+
+        try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig, 
Time.SYSTEM, 512 * 1024 * 1024, true)) {
+            MemoryRecords ms = records(50, "hello", "there");
+            seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms);
+            MemoryRecords ms2 = records(60, "alpha", "beta");
+            seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+            FetchDataInfo read = seg.read(55, 200);
+            checkEquals(ms2.records().iterator(), 
read.records.records().iterator());
+            long oldSize = seg.log().sizeInBytes();
+            long oldPosition = seg.log().channel().position();
+            long oldFileSize = seg.log().file().length();
+            assertEquals(512 * 1024 * 1024, oldFileSize);
+            seg.close();
+            // After close, file should be trimmed
+            assertEquals(oldSize, seg.log().file().length());
+
+            LogSegment segReopen = LogSegment.open(tempDir, 40, logConfig, 
Time.SYSTEM,
+                true, 512 * 1024 * 1024, true, "");
+            segments.add(segReopen);
+
+            FetchDataInfo readAgain = segReopen.read(55, 200);
+            checkEquals(ms2.records().iterator(), 
readAgain.records.records().iterator());
+            long size = segReopen.log().sizeInBytes();
+            long position = segReopen.log().channel().position();
+            long fileSize = segReopen.log().file().length();
+            assertEquals(oldPosition, position);
+            assertEquals(oldSize, size);
+            assertEquals(size, fileSize);
+        }
+    }
+
+    private MemoryRecords recordsForTruncateEven(long offset, String record) {
+        return MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, offset, 
Compression.NONE,
+            TimestampType.CREATE_TIME, new SimpleRecord(offset * 1000, 
record.getBytes()));
+    }
+
+    @Test
+    public void shouldTruncateEvenIfOffsetPointsToAGapInTheLog() throws 
IOException {
+        try (LogSegment seg = createSegment(40)) {
+            long offset = 40;
+
+            // Given two messages with a gap between them (e.g. mid offset 
compacted away)
+            MemoryRecords ms1 = recordsForTruncateEven(offset, "first 
message");
+            seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1);
+            MemoryRecords ms2 = recordsForTruncateEven(offset + 3, "message 
after gap");
+            seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2);
+
+            // When we truncate to an offset without a corresponding log entry
+            seg.truncateTo(offset + 1);
+
+            // Then we should still truncate the record that was present (i.e. 
offset + 3 is gone)
+            FetchDataInfo log = seg.read(offset, 10000);
+            Iterator<? extends RecordBatch> iter = 
log.records.batches().iterator();
+            assertEquals(offset, iter.next().baseOffset());
+            assertFalse(iter.hasNext());
+        }
+    }
+
+    private MemoryRecords records(long offset, int size) {
+        return MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, offset, 
Compression.NONE, TimestampType.CREATE_TIME,
+            new SimpleRecord(new byte[size]));
+    }
+
+    @Test
+    public void testAppendFromFile() throws IOException {
+        // create a log file in a separate directory to avoid conflicting with 
created segments
+        File tempDir = TestUtils.tempDirectory();
+        FileRecords fileRecords = 
FileRecords.open(LogFileUtils.logFile(tempDir, 0));
+
+        // Simulate a scenario with log offset range exceeding 
Integer.MAX_VALUE
+        fileRecords.append(records(0, 1024));
+        fileRecords.append(records(500, 1024 * 1024 + 1));
+        long sizeBeforeOverflow = fileRecords.sizeInBytes();
+        fileRecords.append(records(Integer.MAX_VALUE + 5L, 1024));
+        long sizeAfterOverflow = fileRecords.sizeInBytes();
+
+        try (LogSegment segment = createSegment(0)) {
+            long bytesAppended = segment.appendFromFile(fileRecords, 0);
+            assertEquals(sizeBeforeOverflow, bytesAppended);
+            assertEquals(sizeBeforeOverflow, segment.size());
+        }
+
+        try (LogSegment overflowSegment = createSegment(Integer.MAX_VALUE)) {
+            long overflowBytesAppended = 
overflowSegment.appendFromFile(fileRecords, (int) sizeBeforeOverflow);
+            assertEquals(sizeAfterOverflow - sizeBeforeOverflow, 
overflowBytesAppended);
+            assertEquals(overflowBytesAppended, overflowSegment.size());
+        }
+
+        Utils.delete(tempDir);
+    }
+
+    @Test
+    public void testGetFirstBatchTimestamp() throws IOException {
+        try (LogSegment segment = createSegment(1)) {
+            assertEquals(Long.MAX_VALUE, segment.getFirstBatchTimestamp());
+
+            segment.append(1, 1000L, 1, MemoryRecords.withRecords(1, 
Compression.NONE, new SimpleRecord("one".getBytes())));
+            assertEquals(1000L, segment.getFirstBatchTimestamp());
+        }
+    }
+
+    @Test
+    public void testDeleteIfExistsWithGetParentIsNull() throws IOException {
+        FileRecords log = mock(FileRecords.class);
+        @SuppressWarnings("unchecked")
+        LazyIndex<OffsetIndex> lazyOffsetIndex = mock(LazyIndex.class);
+        @SuppressWarnings("unchecked")
+        LazyIndex<TimeIndex> lazyTimeIndex = mock(LazyIndex.class);
+        TransactionIndex transactionIndex = mock(TransactionIndex.class);
+
+
+        // Use Mockito's when().thenReturn() for stubbing
+        when(log.deleteIfExists()).thenReturn(true);
+        when(lazyOffsetIndex.deleteIfExists()).thenReturn(true);
+        when(lazyTimeIndex.deleteIfExists()).thenReturn(true);
+        when(transactionIndex.deleteIfExists()).thenReturn(false);
+
+        File mockFile = mock(File.class);
+        when(mockFile.getAbsolutePath()).thenReturn("/test/path");
+        when(log.file()).thenReturn(mockFile);
+        when(lazyOffsetIndex.file()).thenReturn(mockFile);
+        when(lazyTimeIndex.file()).thenReturn(mockFile);
+
+        File transactionIndexFile = new File("/");
+        when(transactionIndex.file()).thenReturn(transactionIndexFile);
+
+        try (LogSegment segment = new LogSegment(log, lazyOffsetIndex, 
lazyTimeIndex, transactionIndex, 0, 10, 100, Time.SYSTEM)) {
+            assertDoesNotThrow(
+                () -> segment.deleteIfExists(),
+                "Should not throw exception when 
transactionIndex.deleteIfExists() returns false");
+        }
+    }
+
+    private ProducerStateManager newProducerStateManager() throws IOException {
+        return new ProducerStateManager(
+            topicPartition,
+            logDir,
+            (int) (Duration.ofMinutes(5).toMillis()),
+            new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+            new MockTime()
+        );
+    }
+
+    private void checkEquals(Iterator<?> s1, Iterator<?> s2) {
+        while (s1.hasNext() && s2.hasNext()) {
+            assertEquals(s1.next(), s2.next());
+        }
+        assertFalse(s1.hasNext(), "Iterators have uneven length--first has 
more");
+        assertFalse(s2.hasNext(), "Iterators have uneven length--second has 
more");
+    }
+
+    private void writeNonsenseToFile(File file, long position, int size) 
throws IOException {
+        try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
+            raf.seek(position);
+            for (int i = 0; i < size; i++) {
+                raf.writeByte(TestUtils.RANDOM.nextInt(255));
+            }
+        }
+    }
+}

Reply via email to