This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06a25151094 KAFKA-19752 Move parts of UnifiedLogTest to storage module 
(#21644)
06a25151094 is described below

commit 06a25151094fa9c58f0baf7c43b5c123908c21f7
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Mar 9 10:29:54 2026 +0100

    KAFKA-19752 Move parts of UnifiedLogTest to storage module (#21644)
    
    UnifiedLogTest is ~4000 lines, let's convert it in smaller chunks.  Here
    is roughly the first 1000 lines.
    
    Reviewers: Christo Lolov <[email protected]>, Kuan-Po Tseng
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |  934 +-----------------
 .../kafka/storage/internals/log/LogTestUtils.java  |   93 +-
 .../storage/internals/log/UnifiedLogTest.java      | 1028 +++++++++++++++++++-
 3 files changed, 1113 insertions(+), 942 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index c54f587d452..f4dc46b2f39 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -27,11 +27,10 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.record.internal.FileRecords.TimestampAndOffset
-import org.apache.kafka.common.record.internal.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.internal._
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.requests.{ListOffsetsRequest, 
ListOffsetsResponse}
-import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
@@ -39,10 +38,10 @@ import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, 
DelayedRemoteListOffsets}
 import org.apache.kafka.server.storage.log.{FetchIsolation, 
UnexpectedAppendOffsetException}
-import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
+import org.apache.kafka.server.util.{MockTime, Scheduler}
 import 
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, 
PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
AsyncOffsetReader, Cleaner, EpochEntry, LogConfig, LogFileUtils, 
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, 
LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, 
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, 
RecordValidationException, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
AsyncOffsetReader, Cleaner, LogConfig, LogFileUtils, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, 
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, 
RecordValidationException, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.internals.utils.Throttler
 import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, 
BrokerTopicStats}
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
@@ -64,7 +63,6 @@ import java.nio.file.Files
 import java.util
 import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
 import java.util.{Optional, OptionalLong, Properties}
-import scala.collection.immutable.SortedSet
 import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 
@@ -98,932 +96,6 @@ class UnifiedLogTest {
     }
   }
 
-  @Test
-  def testHighWatermarkMetadataUpdatedAfterSegmentRoll(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-
-    def assertFetchSizeAndOffsets(fetchOffset: Long,
-                                  expectedSize: Int,
-                                  expectedOffsets: Seq[Long]): Unit = {
-      val readInfo = log.read(
-        fetchOffset,
-        2048,
-        FetchIsolation.HIGH_WATERMARK,
-        false)
-      assertEquals(expectedSize, readInfo.records.sizeInBytes)
-      assertEquals(expectedOffsets, 
readInfo.records.records.asScala.map(_.offset))
-    }
-
-    val records = TestUtils.records(List(
-      new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
-    ))
-
-    log.appendAsLeader(records, 0)
-    assertFetchSizeAndOffsets(fetchOffset = 0L, 0, Seq())
-
-    log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
-    assertFetchSizeAndOffsets(fetchOffset = 0L, records.sizeInBytes, Seq(0, 1, 
2))
-
-    log.roll()
-    assertFetchSizeAndOffsets(fetchOffset = 0L, records.sizeInBytes, Seq(0, 1, 
2))
-
-    log.appendAsLeader(records, 0)
-    assertFetchSizeAndOffsets(fetchOffset = 3L, 0, Seq())
-  }
-
-  @Test
-  def testAppendAsLeaderWithRaftLeader(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-    val leaderEpoch = 0
-
-    def records(offset: Long): MemoryRecords = TestUtils.records(List(
-      new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
-    ), baseOffset = offset, partitionLeaderEpoch = leaderEpoch)
-
-    log.appendAsLeader(records(0), leaderEpoch, AppendOrigin.RAFT_LEADER)
-    assertEquals(0, log.logStartOffset)
-    assertEquals(3L, log.logEndOffset)
-
-    // Since raft leader is responsible for assigning offsets, and the 
LogValidator is bypassed from the performance perspective,
-    // so the first offset of the MemoryRecords to be append should equal to 
the next offset in the log
-    assertThrows(classOf[UnexpectedAppendOffsetException], () => 
log.appendAsLeader(records(1), leaderEpoch, AppendOrigin.RAFT_LEADER))
-
-    // When the first offset of the MemoryRecords to be append equals to the 
next offset in the log, append will succeed
-    log.appendAsLeader(records(3), leaderEpoch, AppendOrigin.RAFT_LEADER)
-    assertEquals(6, log.logEndOffset)
-  }
-
-  @Test
-  def testAppendInfoFirstOffset(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-
-    val simpleRecords = List(
-      new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
-    )
-
-    val records = TestUtils.records(simpleRecords)
-
-    val firstAppendInfo = log.appendAsLeader(records, 0)
-    assertEquals(0, firstAppendInfo.firstOffset)
-
-    val secondAppendInfo = log.appendAsLeader(
-      TestUtils.records(simpleRecords),
-      0
-    )
-    assertEquals(simpleRecords.size, secondAppendInfo.firstOffset)
-
-    log.roll()
-    val afterRollAppendInfo =  
log.appendAsLeader(TestUtils.records(simpleRecords), 0)
-    assertEquals(simpleRecords.size * 2, afterRollAppendInfo.firstOffset)
-  }
-
-  @Test
-  def testTruncateBelowFirstUnstableOffset(): Unit = {
-    testTruncateBelowFirstUnstableOffset((log, targetOffset) => 
log.truncateTo(targetOffset))
-  }
-
-  @Test
-  def testTruncateFullyAndStartBelowFirstUnstableOffset(): Unit = {
-    testTruncateBelowFirstUnstableOffset((log, targetOffset) => 
log.truncateFullyAndStartAt(targetOffset, Optional.empty))
-  }
-
-  @Test
-  def testTruncateFullyAndStart(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-
-    val producerId = 17L
-    val producerEpoch: Short = 10
-    val sequence = 0
-
-    log.appendAsLeader(TestUtils.records(List(
-      new SimpleRecord("0".getBytes),
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )), 0)
-
-    log.appendAsLeader(MemoryRecords.withTransactionalRecords(
-      Compression.NONE,
-      producerId,
-      producerEpoch,
-      sequence,
-      new SimpleRecord("3".getBytes),
-      new SimpleRecord("4".getBytes)
-    ), 0)
-
-    assertEquals(Optional.of(3L), log.firstUnstableOffset)
-
-    // We close and reopen the log to ensure that the first unstable offset 
segment
-    // position will be undefined when we truncate the log.
-    log.close()
-
-    val reopened = createLog(logDir, logConfig)
-    assertEquals(Optional.of(new LogOffsetMetadata(3L)), 
reopened.producerStateManager.firstUnstableOffset)
-
-    reopened.truncateFullyAndStartAt(2L, Optional.of(1L))
-    assertEquals(Optional.empty, reopened.firstUnstableOffset)
-    assertEquals(util.Map.of, reopened.producerStateManager.activeProducers)
-    assertEquals(1L, reopened.logStartOffset)
-    assertEquals(2L, reopened.logEndOffset)
-  }
-
-  private def testTruncateBelowFirstUnstableOffset(truncateFunc: (UnifiedLog, 
Long) => Unit): Unit = {
-    // Verify that truncation below the first unstable offset correctly
-    // resets the producer state. Specifically we are testing the case when
-    // the segment position of the first unstable offset is unknown.
-
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-
-    val producerId = 17L
-    val producerEpoch: Short = 10
-    val sequence = 0
-
-    log.appendAsLeader(TestUtils.records(List(
-      new SimpleRecord("0".getBytes),
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )), 0)
-
-    log.appendAsLeader(MemoryRecords.withTransactionalRecords(
-      Compression.NONE,
-      producerId,
-      producerEpoch,
-      sequence,
-      new SimpleRecord("3".getBytes),
-      new SimpleRecord("4".getBytes)
-    ), 0)
-
-    assertEquals(Optional.of(3L), log.firstUnstableOffset)
-
-    // We close and reopen the log to ensure that the first unstable offset 
segment
-    // position will be undefined when we truncate the log.
-    log.close()
-
-    val reopened = createLog(logDir, logConfig)
-    assertEquals(Optional.of(new LogOffsetMetadata(3L)), 
reopened.producerStateManager.firstUnstableOffset)
-
-    truncateFunc(reopened, 0L)
-    assertEquals(Optional.empty, reopened.firstUnstableOffset)
-    assertEquals(util.Map.of, reopened.producerStateManager.activeProducers)
-  }
-
-  @Test
-  def testHighWatermarkMaintenance(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-    val leaderEpoch = 0
-
-    def records(offset: Long): MemoryRecords = TestUtils.records(List(
-      new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
-    ), baseOffset = offset, partitionLeaderEpoch= leaderEpoch)
-
-    def assertHighWatermark(offset: Long): Unit = {
-      assertEquals(offset, log.highWatermark)
-      assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark)
-    }
-
-    // High watermark initialized to 0
-    assertHighWatermark(0L)
-
-    // High watermark not changed by append
-    log.appendAsLeader(records(0), leaderEpoch)
-    assertHighWatermark(0L)
-
-    // Update high watermark as leader
-    log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L))
-    assertHighWatermark(1L)
-
-    // Cannot update past the log end offset
-    log.updateHighWatermark(5L)
-    assertHighWatermark(3L)
-
-    // Update high watermark as follower
-    log.appendAsFollower(records(3L), leaderEpoch)
-    log.updateHighWatermark(6L)
-    assertHighWatermark(6L)
-
-    // High watermark should be adjusted by truncation
-    log.truncateTo(3L)
-    assertHighWatermark(3L)
-
-    log.appendAsLeader(records(0L), 0)
-    assertHighWatermark(3L)
-    assertEquals(6L, log.logEndOffset)
-    assertEquals(0L, log.logStartOffset)
-
-    // Full truncation should also reset high watermark
-    log.truncateFullyAndStartAt(4L, Optional.empty)
-    assertEquals(4L, log.logEndOffset)
-    assertEquals(4L, log.logStartOffset)
-    assertHighWatermark(4L)
-  }
-
-  private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation: 
FetchIsolation, batchBaseOffset: Long): Unit = {
-    val readInfo = log.read(offset, Int.MaxValue, isolation, true)
-
-    assertFalse(readInfo.firstEntryIncomplete)
-    assertTrue(readInfo.records.sizeInBytes > 0)
-
-    val upperBoundOffset = isolation match {
-      case FetchIsolation.LOG_END => log.logEndOffset
-      case FetchIsolation.HIGH_WATERMARK => log.highWatermark
-      case FetchIsolation.TXN_COMMITTED => log.lastStableOffset
-    }
-
-    for (record <- readInfo.records.records.asScala)
-      assertTrue(record.offset < upperBoundOffset)
-
-    assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset)
-    assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
-  }
-
-  private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation: 
FetchIsolation, batchBaseOffset: Long): Unit = {
-    val readInfo = log.read(offset, Int.MaxValue, isolation, true)
-    assertFalse(readInfo.firstEntryIncomplete)
-    assertEquals(0, readInfo.records.sizeInBytes)
-    assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset)
-    assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
-  }
-
-  @Test
-  def testFetchUpToLogEndOffset(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-
-    log.appendAsLeader(TestUtils.records(List(
-      new SimpleRecord("0".getBytes),
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )), 0)
-    log.appendAsLeader(TestUtils.records(List(
-      new SimpleRecord("3".getBytes),
-      new SimpleRecord("4".getBytes)
-    )), 0)
-    val batchBaseOffsets = SortedSet[Long](0, 3, 5)
-
-    (log.logStartOffset until log.logEndOffset).foreach { offset =>
-      val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
-      assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END, batchBaseOffset)
-    }
-  }
-
-  @Test
-  def testFetchUpToHighWatermark(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-
-    log.appendAsLeader(TestUtils.records(List(
-      new SimpleRecord("0".getBytes),
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )), 0)
-    log.appendAsLeader(TestUtils.records(List(
-      new SimpleRecord("3".getBytes),
-      new SimpleRecord("4".getBytes)
-    )), 0)
-    val batchBaseOffsets = SortedSet[Long](0, 3, 5)
-
-    def assertHighWatermarkBoundedFetches(): Unit = {
-      (log.logStartOffset until log.highWatermark).foreach { offset =>
-        val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
-        assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, 
batchBaseOffset)
-      }
-
-      (log.highWatermark to log.logEndOffset).foreach { offset =>
-        val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
-        assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, 
batchBaseOffset)
-      }
-    }
-
-    assertHighWatermarkBoundedFetches()
-
-    log.updateHighWatermark(3L)
-    assertHighWatermarkBoundedFetches()
-
-    log.updateHighWatermark(5L)
-    assertHighWatermarkBoundedFetches()
-  }
-
-  @Test
-  def testActiveProducers(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-
-    def assertProducerState(
-      producerId: Long,
-      producerEpoch: Short,
-      lastSequence: Int,
-      currentTxnStartOffset: Option[Long],
-      coordinatorEpoch: Option[Int]
-    ): Unit = {
-      val producerStateOpt = log.activeProducers.asScala.find(_.producerId == 
producerId)
-      assertTrue(producerStateOpt.isDefined)
-
-      val producerState = producerStateOpt.get
-      assertEquals(producerEpoch, producerState.producerEpoch)
-      assertEquals(lastSequence, producerState.lastSequence)
-      assertEquals(currentTxnStartOffset.getOrElse(-1L), 
producerState.currentTxnStartOffset)
-      assertEquals(coordinatorEpoch.getOrElse(-1), 
producerState.coordinatorEpoch)
-    }
-
-    // Test transactional producer state (open transaction)
-    val producer1Epoch = 5.toShort
-    val producerId1 = 1L
-    LogTestUtils.appendTransactionalAsLeader(log, producerId1, producer1Epoch, 
mockTime)(5)
-    assertProducerState(
-      producerId1,
-      producer1Epoch,
-      lastSequence = 4,
-      currentTxnStartOffset = Some(0L),
-      coordinatorEpoch = None
-    )
-
-    // Test transactional producer state (closed transaction)
-    val coordinatorEpoch = 15
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1, producer1Epoch, 
ControlRecordType.COMMIT,
-      mockTime.milliseconds(), coordinatorEpoch, leaderEpoch = 0, 
transactionVersion = TransactionVersion.TV_0.featureLevel())
-    assertProducerState(
-      producerId1,
-      producer1Epoch,
-      lastSequence = 4,
-      currentTxnStartOffset = None,
-      coordinatorEpoch = Some(coordinatorEpoch)
-    )
-
-    // Test idempotent producer state
-    val producer2Epoch = 5.toShort
-    val producerId2 = 2L
-    LogTestUtils.appendIdempotentAsLeader(log, producerId2, producer2Epoch, 
mockTime)(3)
-    assertProducerState(
-      producerId2,
-      producer2Epoch,
-      lastSequence = 2,
-      currentTxnStartOffset = None,
-      coordinatorEpoch = None
-    )
-  }
-
-  @Test
-  def testFetchUpToLastStableOffset(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-    val epoch = 0.toShort
-
-    val producerId1 = 1L
-    val producerId2 = 2L
-
-    val appendProducer1 = LogTestUtils.appendTransactionalAsLeader(log, 
producerId1, epoch, mockTime)
-    val appendProducer2 = LogTestUtils.appendTransactionalAsLeader(log, 
producerId2, epoch, mockTime)
-
-    appendProducer1(5)
-    LogTestUtils.appendNonTransactionalAsLeader(log, 3)
-    appendProducer2(2)
-    appendProducer1(4)
-    LogTestUtils.appendNonTransactionalAsLeader(log, 2)
-    appendProducer1(10)
-
-    val batchBaseOffsets = SortedSet[Long](0, 5, 8, 10, 14, 16, 26, 27, 28)
-
-    def assertLsoBoundedFetches(): Unit = {
-      (log.logStartOffset until log.lastStableOffset).foreach { offset =>
-        val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
-        assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED, 
batchBaseOffset)
-      }
-
-      (log.lastStableOffset to log.logEndOffset).foreach { offset =>
-        val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
-        assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED, 
batchBaseOffset)
-      }
-    }
-
-    assertLsoBoundedFetches()
-
-    log.updateHighWatermark(log.logEndOffset)
-    assertLsoBoundedFetches()
-
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1, epoch, 
ControlRecordType.COMMIT, mockTime.milliseconds(),
-      transactionVersion = TransactionVersion.TV_0.featureLevel())
-    assertEquals(0L, log.lastStableOffset)
-
-    log.updateHighWatermark(log.logEndOffset)
-    assertEquals(8L, log.lastStableOffset)
-    assertLsoBoundedFetches()
-
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId2, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
-      transactionVersion = TransactionVersion.TV_0.featureLevel())
-    assertEquals(8L, log.lastStableOffset)
-
-    log.updateHighWatermark(log.logEndOffset)
-    assertEquals(log.logEndOffset, log.lastStableOffset)
-    assertLsoBoundedFetches()
-  }
-
-  /**
-   * Tests for time based log roll. This test appends messages then changes 
the time
-   * using the mock clock to force the log to roll and checks the number of 
segments.
-   */
-  @Test
-  def testTimeBasedLogRollDuringAppend(): Unit = {
-    def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
-
-    // create a log
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = new 
ProducerStateManagerConfig(24 * 60, false))
-    assertEquals(1, log.numberOfSegments, "Log begins with a single empty 
segment.")
-    // Test the segment rolling behavior when messages do not have a timestamp.
-    mockTime.sleep(log.config.segmentMs + 1)
-    log.appendAsLeader(createRecords, 0)
-    assertEquals(1, log.numberOfSegments, "Log doesn't roll if doing so 
creates an empty segment.")
-
-    log.appendAsLeader(createRecords, 0)
-    assertEquals(2, log.numberOfSegments, "Log rolls on this append since time 
has expired.")
-
-    for (numSegments <- 3 until 5) {
-      mockTime.sleep(log.config.segmentMs + 1)
-      log.appendAsLeader(createRecords, 0)
-      assertEquals(numSegments, log.numberOfSegments, "Changing time beyond 
rollMs and appending should create a new segment.")
-    }
-
-    // Append a message with timestamp to a segment whose first message do not 
have a timestamp.
-    val timestamp = mockTime.milliseconds + log.config.segmentMs + 1
-    def createRecordsWithTimestamp = TestUtils.singletonRecords(value = 
"test".getBytes, timestamp = timestamp)
-    log.appendAsLeader(createRecordsWithTimestamp, 0)
-    assertEquals(4, log.numberOfSegments, "Segment should not have been rolled 
out because the log rolling should be based on wall clock.")
-
-    // Test the segment rolling behavior when messages have timestamps.
-    mockTime.sleep(log.config.segmentMs + 1)
-    log.appendAsLeader(createRecordsWithTimestamp, 0)
-    assertEquals(5, log.numberOfSegments, "A new segment should have been 
rolled out")
-
-    // move the wall clock beyond log rolling time
-    mockTime.sleep(log.config.segmentMs + 1)
-    log.appendAsLeader(createRecordsWithTimestamp, 0)
-    assertEquals(5, log.numberOfSegments, "Log should not roll because the 
roll should depend on timestamp of the first message.")
-
-    val recordWithExpiredTimestamp = TestUtils.singletonRecords(value = 
"test".getBytes, timestamp = mockTime.milliseconds)
-    log.appendAsLeader(recordWithExpiredTimestamp, 0)
-    assertEquals(6, log.numberOfSegments, "Log should roll because the 
timestamp in the message should make the log segment expire.")
-
-    val numSegments = log.numberOfSegments
-    mockTime.sleep(log.config.segmentMs + 1)
-    log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE), 0)
-    assertEquals(numSegments, log.numberOfSegments, "Appending an empty 
message set should not roll log even if sufficient time has passed.")
-  }
-
-  @Test
-  def testRollSegmentThatAlreadyExists(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
-    val partitionLeaderEpoch = 0
-
-    // create a log
-    val log = createLog(logDir, logConfig)
-    assertEquals(1, log.numberOfSegments, "Log begins with a single empty 
segment.")
-
-    // roll active segment with the same base offset of size zero should 
recreate the segment
-    log.roll(Optional.of(0L))
-    assertEquals(1, log.numberOfSegments, "Expect 1 segment after roll() empty 
segment with base offset.")
-
-    // should be able to append records to active segment
-    val records = TestUtils.records(
-      List(new SimpleRecord(mockTime.milliseconds, "k1".getBytes, 
"v1".getBytes)),
-      baseOffset = 0L, partitionLeaderEpoch = partitionLeaderEpoch)
-    log.appendAsFollower(records, partitionLeaderEpoch)
-    assertEquals(1, log.numberOfSegments, "Expect one segment.")
-    assertEquals(0L, log.activeSegment.baseOffset)
-
-    // make sure we can append more records
-    val records2 = TestUtils.records(
-      List(new SimpleRecord(mockTime.milliseconds + 10, "k2".getBytes, 
"v2".getBytes)),
-      baseOffset = 1L, partitionLeaderEpoch = partitionLeaderEpoch)
-    log.appendAsFollower(records2, partitionLeaderEpoch)
-
-    assertEquals(2, log.logEndOffset, "Expect two records in the log")
-    assertEquals(0, LogTestUtils.readLog(log, 0, 
1).records.batches.iterator.next().lastOffset)
-    assertEquals(1, LogTestUtils.readLog(log, 1, 
1).records.batches.iterator.next().lastOffset)
-
-    // roll so that active segment is empty
-    log.roll()
-    assertEquals(2L, log.activeSegment.baseOffset, "Expect base offset of 
active segment to be LEO")
-    assertEquals(2, log.numberOfSegments, "Expect two segments.")
-
-    // manually resize offset index to force roll of an empty active segment 
on next append
-    log.activeSegment.offsetIndex.resize(0)
-    val records3 = TestUtils.records(
-      List(new SimpleRecord(mockTime.milliseconds + 12, "k3".getBytes, 
"v3".getBytes)),
-      baseOffset = 2L, partitionLeaderEpoch = partitionLeaderEpoch)
-    log.appendAsFollower(records3, partitionLeaderEpoch)
-    assertTrue(log.activeSegment.offsetIndex.maxEntries > 1)
-    assertEquals(2, LogTestUtils.readLog(log, 2, 
1).records.batches.iterator.next().lastOffset)
-    assertEquals(2, log.numberOfSegments, "Expect two segments.")
-  }
-
-  @Test
-  def testNonSequentialAppend(): Unit = {
-    // create a log
-    val log = createLog(logDir, new LogConfig(new Properties))
-    val pid = 1L
-    val epoch: Short = 0
-
-    val records = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), 
producerId = pid, producerEpoch = epoch, sequence = 0)
-    log.appendAsLeader(records, 0)
-
-    val nextRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), 
producerId = pid, producerEpoch = epoch, sequence = 2)
-    assertThrows(classOf[OutOfOrderSequenceException], () => 
log.appendAsLeader(nextRecords, 0))
-  }
-
-  @Test
-  def testTruncateToEndOffsetClearsEpochCache(): Unit = {
-    val log = createLog(logDir, new LogConfig(new Properties))
-
-    // Seed some initial data in the log
-    val records = TestUtils.records(List(new SimpleRecord("a".getBytes), new 
SimpleRecord("b".getBytes)),
-      baseOffset = 27)
-    appendAsFollower(log, records, 19)
-    assertEquals(Optional.of(new EpochEntry(19, 27)), 
log.leaderEpochCache.latestEntry)
-    assertEquals(29, log.logEndOffset)
-
-    def verifyTruncationClearsEpochCache(epoch: Int, truncationOffset: Long): 
Unit = {
-      // Simulate becoming a leader
-      log.assignEpochStartOffset(epoch, log.logEndOffset)
-      assertEquals(Optional.of(new EpochEntry(epoch, 29)), 
log.leaderEpochCache.latestEntry)
-      assertEquals(29, log.logEndOffset)
-
-      // Now we become the follower and truncate to an offset greater
-      // than or equal to the log end offset. The trivial epoch entry
-      // at the end of the log should be gone
-      log.truncateTo(truncationOffset)
-      assertEquals(Optional.of(new EpochEntry(19, 27)), 
log.leaderEpochCache.latestEntry)
-      assertEquals(29, log.logEndOffset)
-    }
-
-    // Truncations greater than or equal to the log end offset should
-    // clear the epoch cache
-    verifyTruncationClearsEpochCache(epoch = 20, truncationOffset = 
log.logEndOffset)
-    verifyTruncationClearsEpochCache(epoch = 24, truncationOffset = 
log.logEndOffset + 1)
-  }
-
-  /**
-   * Test the values returned by the logSegments call
-   */
-  @Test
-  def testLogSegmentsCallCorrect(): Unit = {
-    // Create 3 segments and make sure we get the right values from various 
logSegments calls.
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds)
-    def getSegmentOffsets(log :UnifiedLog, from: Long, to: Long) = 
log.logSegments(from, to).stream().map { _.baseOffset }.toList
-    val setSize = createRecords.sizeInBytes
-    val msgPerSeg = 10
-    val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
-    // create a log
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
-    val log = createLog(logDir, logConfig)
-    assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-
-    // segments expire in size
-    for (_ <- 1 to (2 * msgPerSeg + 2))
-      log.appendAsLeader(createRecords, 0)
-    assertEquals(3, log.numberOfSegments, "There should be exactly 3 
segments.")
-
-    // from == to should always be null
-    assertEquals(util.List.of(), getSegmentOffsets(log, 10, 10))
-    assertEquals(util.List.of(), getSegmentOffsets(log, 15, 15))
-
-    assertEquals(util.List.of(0L, 10L, 20L), getSegmentOffsets(log, 0, 21))
-
-    assertEquals(util.List.of(0L), getSegmentOffsets(log, 1, 5))
-    assertEquals(util.List.of(10L, 20L), getSegmentOffsets(log, 13, 21))
-    assertEquals(util.List.of(10L), getSegmentOffsets(log, 13, 17))
-
-    // from < to is bad
-    assertThrows(classOf[IllegalArgumentException], () => log.logSegments(10, 
0))
-  }
-
-  @Test
-  def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
-    // simulate the upgrade path by creating a new log with several segments, 
deleting the
-    // snapshot files, and then reloading the log
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 64 * 10)
-    var log = createLog(logDir, logConfig)
-    assertEquals(OptionalLong.empty(), log.oldestProducerSnapshotOffset)
-
-    for (i <- 0 to 100) {
-      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
-      log.appendAsLeader(TestUtils.records(List(record)), 0)
-    }
-    assertTrue(log.logSegments.size >= 2)
-    val logEndOffset = log.logEndOffset
-    log.close()
-
-    LogTestUtils.deleteProducerSnapshotFiles(logDir)
-
-    // Reload after clean shutdown
-    log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
-    var expectedSnapshotOffsets = 
log.logSegments.asScala.map(_.baseOffset).takeRight(2).toVector :+ 
log.logEndOffset
-    assertEquals(expectedSnapshotOffsets, 
LogTestUtils.listProducerSnapshotOffsets(logDir))
-    log.close()
-
-    LogTestUtils.deleteProducerSnapshotFiles(logDir)
-
-    // Reload after unclean shutdown with recoveryPoint set to log end offset
-    log = createLog(logDir, logConfig, recoveryPoint = logEndOffset, 
lastShutdownClean = false)
-    assertEquals(expectedSnapshotOffsets, 
LogTestUtils.listProducerSnapshotOffsets(logDir))
-    log.close()
-
-    LogTestUtils.deleteProducerSnapshotFiles(logDir)
-
-    // Reload after unclean shutdown with recoveryPoint set to 0
-    log = createLog(logDir, logConfig, recoveryPoint = 0L, lastShutdownClean = 
false)
-    // We progressively create a snapshot for each segment after the recovery 
point
-    expectedSnapshotOffsets = 
log.logSegments.asScala.map(_.baseOffset).tail.toVector :+ log.logEndOffset
-    assertEquals(expectedSnapshotOffsets, 
LogTestUtils.listProducerSnapshotOffsets(logDir))
-    log.close()
-  }
-
-  @Test
-  def testLogReinitializeAfterManualDelete(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig()
-    // simulate a case where log data does not exist but the start offset is 
non-zero
-    val log = createLog(logDir, logConfig, logStartOffset = 500)
-    assertEquals(500, log.logStartOffset)
-    assertEquals(500, log.logEndOffset)
-  }
-
-  /**
-   * Test that "PeriodicProducerExpirationCheck" scheduled task gets canceled 
after log
-   * is deleted.
-   */
-  @Test
-  def testProducerExpireCheckAfterDelete(): Unit = {
-    val scheduler = new KafkaScheduler(1)
-    try {
-      scheduler.startup()
-      val logConfig = LogTestUtils.createLogConfig()
-      val log = createLog(logDir, logConfig, scheduler = scheduler)
-
-      val producerExpireCheck = log.producerExpireCheck
-      assertTrue(scheduler.taskRunning(producerExpireCheck), 
"producerExpireCheck isn't as part of scheduled tasks")
-
-      log.delete()
-      assertFalse(scheduler.taskRunning(producerExpireCheck),
-        "producerExpireCheck is part of scheduled tasks even after log 
deletion")
-    } finally {
-      scheduler.shutdown()
-    }
-  }
-
-  @Test
-  def testProducerIdMapOffsetUpdatedForNonIdempotentData(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig)
-    val records = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)))
-    log.appendAsLeader(records, 0)
-    log.takeProducerSnapshot()
-    assertEquals(OptionalLong.of(1), log.latestProducerSnapshotOffset)
-  }
-
-  @Test
-  def testRebuildProducerIdMapWithCompactedData(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig)
-    val pid = 1L
-    val producerEpoch = 0.toShort
-    val partitionLeaderEpoch = 0
-    val seq = 0
-    val baseOffset = 23L
-
-    // create a batch with a couple gaps to simulate compaction
-    val records = TestUtils.records(
-      producerId = pid,
-      producerEpoch = producerEpoch,
-      sequence = seq,
-      baseOffset = baseOffset,
-      records = List(
-        new SimpleRecord(mockTime.milliseconds(), "a".getBytes),
-        new SimpleRecord(mockTime.milliseconds(), "key".getBytes, 
"b".getBytes),
-        new SimpleRecord(mockTime.milliseconds(), "c".getBytes),
-        new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "d".getBytes)
-      )
-    )
-    records.batches.forEach(_.setPartitionLeaderEpoch(partitionLeaderEpoch))
-
-    val filtered = ByteBuffer.allocate(2048)
-    records.filterTo(new RecordFilter(0, 0) {
-      override def checkBatchRetention(batch: RecordBatch): 
RecordFilter.BatchRetentionResult =
-        new 
RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.DELETE_EMPTY, 
false)
-      override def shouldRetainRecord(recordBatch: RecordBatch, record: 
Record): Boolean = !record.hasKey
-    }, filtered, BufferSupplier.NO_CACHING)
-    filtered.flip()
-    val filteredRecords = MemoryRecords.readableRecords(filtered)
-
-    log.appendAsFollower(filteredRecords, partitionLeaderEpoch)
-
-    // append some more data and then truncate to force rebuilding of the PID 
map
-    val moreRecords = TestUtils.records(
-      baseOffset = baseOffset + 4,
-      records = List(
-        new SimpleRecord(mockTime.milliseconds(), "e".getBytes),
-        new SimpleRecord(mockTime.milliseconds(), "f".getBytes)
-      )
-    )
-    
moreRecords.batches.forEach(_.setPartitionLeaderEpoch(partitionLeaderEpoch))
-    log.appendAsFollower(moreRecords, partitionLeaderEpoch)
-
-    log.truncateTo(baseOffset + 4)
-
-    val activeProducers = log.activeProducersWithLastSequence
-    assertTrue(activeProducers.containsKey(pid))
-
-    val lastSeq = activeProducers.get(pid)
-    assertEquals(3, lastSeq)
-  }
-
-  @Test
-  def testRebuildProducerStateWithEmptyCompactedBatch(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig)
-    val pid = 1L
-    val producerEpoch = 0.toShort
-    val partitionLeaderEpoch = 0
-    val seq = 0
-    val baseOffset = 23L
-
-    // create an empty batch
-    val records = TestUtils.records(
-      producerId = pid,
-      producerEpoch = producerEpoch,
-      sequence = seq,
-      baseOffset = baseOffset,
-      records = List(
-        new SimpleRecord(mockTime.milliseconds(), "key".getBytes, 
"a".getBytes),
-        new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "b".getBytes)
-      )
-    )
-    records.batches.forEach(_.setPartitionLeaderEpoch(partitionLeaderEpoch))
-
-    val filtered = ByteBuffer.allocate(2048)
-    records.filterTo(new RecordFilter(0, 0) {
-      override def checkBatchRetention(batch: RecordBatch): 
RecordFilter.BatchRetentionResult =
-        new 
RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.RETAIN_EMPTY, 
true)
-      override def shouldRetainRecord(recordBatch: RecordBatch, record: 
Record): Boolean = false
-    }, filtered, BufferSupplier.NO_CACHING)
-    filtered.flip()
-    val filteredRecords = MemoryRecords.readableRecords(filtered)
-
-    log.appendAsFollower(filteredRecords, partitionLeaderEpoch)
-
-    // append some more data and then truncate to force rebuilding of the PID 
map
-    val moreRecords = TestUtils.records(
-      baseOffset = baseOffset + 2,
-      records = List(
-        new SimpleRecord(mockTime.milliseconds(), "e".getBytes),
-        new SimpleRecord(mockTime.milliseconds(), "f".getBytes)
-      )
-    )
-    
moreRecords.batches.forEach(_.setPartitionLeaderEpoch(partitionLeaderEpoch))
-    log.appendAsFollower(moreRecords, partitionLeaderEpoch)
-
-    log.truncateTo(baseOffset + 2)
-
-    val activeProducers = log.activeProducersWithLastSequence
-    assertTrue(activeProducers.containsKey(pid))
-
-    val lastSeq = activeProducers.get(pid)
-    assertEquals(1, lastSeq)
-  }
-
-  @Test
-  def testUpdateProducerIdMapWithCompactedData(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig)
-    val pid = 1L
-    val producerEpoch = 0.toShort
-    val partitionLeaderEpoch = 0
-    val seq = 0
-    val baseOffset = 23L
-
-    // create a batch with a couple gaps to simulate compaction
-    val records = TestUtils.records(
-      producerId = pid,
-      producerEpoch = producerEpoch,
-      sequence = seq,
-      baseOffset = baseOffset,
-      records = List(
-        new SimpleRecord(mockTime.milliseconds(), "a".getBytes),
-        new SimpleRecord(mockTime.milliseconds(), "key".getBytes, 
"b".getBytes),
-        new SimpleRecord(mockTime.milliseconds(), "c".getBytes),
-        new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "d".getBytes)
-      )
-    )
-    records.batches.forEach(_.setPartitionLeaderEpoch(partitionLeaderEpoch))
-
-    val filtered = ByteBuffer.allocate(2048)
-    records.filterTo(new RecordFilter(0, 0) {
-      override def checkBatchRetention(batch: RecordBatch): 
RecordFilter.BatchRetentionResult =
-        new 
RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.DELETE_EMPTY, 
false)
-      override def shouldRetainRecord(recordBatch: RecordBatch, record: 
Record): Boolean = !record.hasKey
-    }, filtered, BufferSupplier.NO_CACHING)
-    filtered.flip()
-    val filteredRecords = MemoryRecords.readableRecords(filtered)
-
-    log.appendAsFollower(filteredRecords, partitionLeaderEpoch)
-    val activeProducers = log.activeProducersWithLastSequence
-    assertTrue(activeProducers.containsKey(pid))
-
-    val lastSeq = activeProducers.get(pid)
-    assertEquals(3, lastSeq)
-  }
-
-  @Test
-  def testProducerIdMapTruncateTo(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig)
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("a".getBytes))), 0)
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("b".getBytes))), 0)
-    log.takeProducerSnapshot()
-
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("c".getBytes))), 0)
-    log.takeProducerSnapshot()
-
-    log.truncateTo(2)
-    assertEquals(OptionalLong.of(2), log.latestProducerSnapshotOffset)
-    assertEquals(2, log.latestProducerStateEndOffset)
-
-    log.truncateTo(1)
-    assertEquals(OptionalLong.of(1), log.latestProducerSnapshotOffset)
-    assertEquals(1, log.latestProducerStateEndOffset)
-
-    log.truncateTo(0)
-    assertEquals(OptionalLong.empty(), log.latestProducerSnapshotOffset)
-    assertEquals(0, log.latestProducerStateEndOffset)
-  }
-
-  @Test
-  def testProducerIdMapTruncateToWithNoSnapshots(): Unit = {
-    // This ensures that the upgrade optimization path cannot be hit after 
initial loading
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig)
-    val pid = 1L
-    val epoch = 0.toShort
-
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid,
-      producerEpoch = epoch, sequence = 0), 0)
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid,
-      producerEpoch = epoch, sequence = 1), 0)
-
-    LogTestUtils.deleteProducerSnapshotFiles(logDir)
-
-    log.truncateTo(1L)
-    assertEquals(1, log.activeProducersWithLastSequence.size)
-
-    val lastSeq = log.activeProducersWithLastSequence.get(pid)
-    assertEquals(0, lastSeq)
-  }
-
-  @ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with 
createEmptyActiveSegment: {0}")
-  @ValueSource(booleans = Array(true, false))
-  def testRetentionDeletesProducerStateSnapshots(createEmptyActiveSegment: 
Boolean): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
-    val log = createLog(logDir, logConfig)
-    val pid1 = 1L
-    val epoch = 0.toShort
-
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
-      producerEpoch = epoch, sequence = 0), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
-      producerEpoch = epoch, sequence = 1), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
-      producerEpoch = epoch, sequence = 2), 0)
-    if (createEmptyActiveSegment) {
-      log.roll()
-    }
-
-    log.updateHighWatermark(log.logEndOffset)
-
-    val numProducerSnapshots = if (createEmptyActiveSegment) 3 else 2
-    assertEquals(numProducerSnapshots, 
ProducerStateManager.listSnapshotFiles(logDir).size)
-    // Sleep to breach the retention period
-    mockTime.sleep(1000 * 60 + 1)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
-    mockTime.sleep(1)
-    assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
-      "expect a single producer state snapshot remaining")
-    assertEquals(3, log.logStartOffset)
-  }
-
   @Test
   def testRetentionIdempotency(): Unit = {
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index 974913b91be..ed30e25032e 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -28,14 +28,21 @@ import 
org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.common.RequestLocal;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 public class LogTestUtils {
     public static LogSegment createSegment(long offset, File logDir, int 
indexIntervalBytes, Time time) throws IOException {
@@ -146,16 +153,98 @@ public class LogTestUtils {
                                         short producerEpoch,
                                         int sequence,
                                         long baseOffset,
-                                        int partitionLeaderEpoch) {
+                                        int partitionLeaderEpoch,
+                                        long timestamp) {
         ByteBuffer buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
         MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
codec, TimestampType.CREATE_TIME, baseOffset,
-            System.currentTimeMillis(), producerId, producerEpoch, sequence, 
false, partitionLeaderEpoch);
+            timestamp, producerId, producerEpoch, sequence, false, 
partitionLeaderEpoch);
 
         records.forEach(builder::append);
 
         return builder.build();
     }
 
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression codec,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        return records(records, magicValue, codec, producerId, producerEpoch, 
sequence, baseOffset, partitionLeaderEpoch, System.currentTimeMillis());
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, producerId, producerEpoch, sequence, baseOffset, 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, long 
timestamp) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH, timestamp);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, long 
baseOffset, int partitionLeaderEpoch) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch);
+    }
+
+    public static void deleteProducerSnapshotFiles(File logDir) {
+        Stream.of(logDir.listFiles())
+                .filter(f -> f.isFile() && 
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX))
+                .forEach(f -> assertDoesNotThrow(() -> Utils.delete(f)));
+    }
+
+    public static List<Long> listProducerSnapshotOffsets(File logDir) throws 
IOException {
+        return ProducerStateManager.listSnapshotFiles(logDir).stream().map(f 
-> f.offset).sorted().toList();
+    }
+
+    public static void appendNonTransactionalAsLeader(UnifiedLog log, int 
numRecords) throws IOException {
+        List<SimpleRecord> simpleRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            simpleRecords.add(new SimpleRecord(String.valueOf(i).getBytes()));
+        }
+        MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, 
simpleRecords.toArray(new SimpleRecord[0]));
+        log.appendAsLeader(records, 0);
+    }
+
+    public static Consumer<Integer> appendTransactionalAsLeader(UnifiedLog log,
+                                                                long 
producerId,
+                                                                short 
producerEpoch,
+                                                                Time time) {
+        return appendIdempotentAsLeader(log, producerId, producerEpoch, time, 
true);
+    }
+
+    public static Consumer<Integer> appendIdempotentAsLeader(UnifiedLog log,
+                                                             long producerId,
+                                                             short 
producerEpoch,
+                                                             Time time,
+                                                             boolean 
isTransactional) {
+        final AtomicInteger sequence = new AtomicInteger(0);
+        return numRecords -> {
+            int baseSequence = sequence.get();
+            List<SimpleRecord> simpleRecords = new ArrayList<>();
+            for (int i = baseSequence; i < baseSequence + numRecords; i++) {
+                simpleRecords.add(new SimpleRecord(time.milliseconds(), 
String.valueOf(i).getBytes()));
+            }
+
+            MemoryRecords records = isTransactional
+                ? MemoryRecords.withTransactionalRecords(Compression.NONE, 
producerId,
+                        producerEpoch, baseSequence, simpleRecords.toArray(new 
SimpleRecord[0]))
+                : MemoryRecords.withIdempotentRecords(Compression.NONE, 
producerId,
+                        producerEpoch, baseSequence, simpleRecords.toArray(new 
SimpleRecord[0]));
+
+            assertDoesNotThrow(() -> log.appendAsLeader(records, 0));
+            sequence.addAndGet(numRecords);
+        };
+    }
+
     public static class LogConfigBuilder {
         private final Map<String, Object> configs = new HashMap<>();
 
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index efef8c8ee4c..e8d1b01391c 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -18,18 +18,24 @@ package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.message.DescribeProducersResponseData;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.record.internal.ControlRecordType;
 import org.apache.kafka.common.record.internal.DefaultRecordBatch;
 import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.internal.Record;
 import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.record.internal.SimpleRecord;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
 import org.apache.kafka.server.common.TransactionVersion;
 import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.storage.log.UnexpectedAppendOffsetException;
+import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.server.util.Scheduler;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
@@ -42,6 +48,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -49,13 +56,22 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Properties;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.doThrow;
@@ -63,6 +79,10 @@ import static org.mockito.Mockito.spy;
 
 public class UnifiedLogTest {
 
+    private static final int ONE_MB = 1024 * 1024;
+    private static final int TEN_KB = 2048 * 5;
+    private static final long ONE_HOUR = 60 * 60L;
+
     private final File tmpDir = TestUtils.tempDirectory();
     private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
     private final BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(false);
@@ -77,7 +97,7 @@ public class UnifiedLogTest {
     public void tearDown() throws IOException {
         brokerTopicStats.close();
         for (UnifiedLog log : logsToClose) {
-            log.close();
+            Utils.closeQuietly(log, "UnifiedLog");
         }
         Utils.delete(tmpDir);
     }
@@ -570,7 +590,7 @@ public class UnifiedLogTest {
     @Test
     public void testFirstUnstableOffsetNoTransactionalData() throws 
IOException {
         LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
-                .segmentBytes(1024 * 1024 * 5)
+                .segmentBytes(5 * ONE_MB)
                 .build();
         log = createLog(logDir, logConfig);
 
@@ -586,7 +606,7 @@ public class UnifiedLogTest {
     @Test
     public void testFirstUnstableOffsetWithTransactionalData() throws 
IOException {
         LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
-                .segmentBytes(1024 * 1024 * 5)
+                .segmentBytes(5 * ONE_MB)
                 .build();
         log = createLog(logDir, logConfig);
 
@@ -622,6 +642,988 @@ public class UnifiedLogTest {
         assertEquals(Optional.empty(), log.firstUnstableOffset());
     }
 
+    @Test
+    public void testHighWatermarkMetadataUpdatedAfterSegmentRoll() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        MemoryRecords records = LogTestUtils.records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ));
+
+        log.appendAsLeader(records, 0);
+        assertFetchSizeAndOffsets(log, 0L, 0, List.of());
+
+        log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+        assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L, 
1L, 2L));
+
+        log.roll();
+        assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L, 
1L, 2L));
+
+        log.appendAsLeader(records, 0);
+        assertFetchSizeAndOffsets(log, 3L, 0, List.of());
+    }
+
+    private void assertFetchSizeAndOffsets(UnifiedLog log, long fetchOffset, 
int expectedSize, List<Long> expectedOffsets) throws IOException {
+        FetchDataInfo readInfo = log.read(
+                fetchOffset,
+                2048,
+                FetchIsolation.HIGH_WATERMARK,
+                false);
+        assertEquals(expectedSize, readInfo.records.sizeInBytes());
+        List<Long> actualOffsets = new ArrayList<>();
+        readInfo.records.records().forEach(record -> 
actualOffsets.add(record.offset()));
+        assertEquals(expectedOffsets, actualOffsets);
+    }
+
+    @Test
+    public void testAppendAsLeaderWithRaftLeader() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        int leaderEpoch = 0;
+
+        Function<Long, MemoryRecords> records = offset -> 
LogTestUtils.records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, 
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+        log.appendAsLeader(records.apply(0L), leaderEpoch, 
AppendOrigin.RAFT_LEADER);
+        assertEquals(0, log.logStartOffset());
+        assertEquals(3L, log.logEndOffset());
+
+        // Since raft leader is responsible for assigning offsets, and the 
LogValidator is bypassed from the performance perspective,
+        // so the first offset of the MemoryRecords to be appended should 
equal to the next offset in the log
+        assertThrows(UnexpectedAppendOffsetException.class, () -> 
log.appendAsLeader(records.apply(1L), leaderEpoch, AppendOrigin.RAFT_LEADER));
+
+        // When the first offset of the MemoryRecords to be appended equals to 
the next offset in the log, append will succeed
+        log.appendAsLeader(records.apply(3L), leaderEpoch, 
AppendOrigin.RAFT_LEADER);
+        assertEquals(6, log.logEndOffset());
+    }
+
+    @Test
+    public void testAppendInfoFirstOffset() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        List<SimpleRecord> simpleRecords = List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        );
+
+        MemoryRecords records = LogTestUtils.records(simpleRecords);
+
+        LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+        assertEquals(0, firstAppendInfo.firstOffset());
+
+        LogAppendInfo secondAppendInfo = log.appendAsLeader(
+                LogTestUtils.records(simpleRecords),
+                0
+        );
+        assertEquals(simpleRecords.size(), secondAppendInfo.firstOffset());
+
+        log.roll();
+        LogAppendInfo afterRollAppendInfo =  
log.appendAsLeader(LogTestUtils.records(simpleRecords), 0);
+        assertEquals(simpleRecords.size() * 2, 
afterRollAppendInfo.firstOffset());
+    }
+
+    @Test
+    public void testTruncateBelowFirstUnstableOffset() throws IOException {
+        testTruncateBelowFirstUnstableOffset(UnifiedLog::truncateTo);
+    }
+
+    @Test
+    public void testTruncateFullyAndStartBelowFirstUnstableOffset() throws 
IOException {
+        testTruncateBelowFirstUnstableOffset((log, targetOffset) -> 
log.truncateFullyAndStartAt(targetOffset, Optional.empty()));
+    }
+
+    @Test
+    public void testTruncateFullyAndStart() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long producerId = 17L;
+        short producerEpoch = 10;
+        int sequence = 0;
+
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("0".getBytes()),
+                new SimpleRecord("1".getBytes()),
+                new SimpleRecord("2".getBytes())
+        )), 0);
+        log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+                Compression.NONE,
+                producerId,
+                producerEpoch,
+                sequence,
+                new SimpleRecord("3".getBytes()),
+                new SimpleRecord("4".getBytes())
+        ), 0);
+        assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+        // We close and reopen the log to ensure that the first unstable 
offset segment
+        // position will be undefined when we truncate the log.
+        log.close();
+
+        UnifiedLog reopened = createLog(logDir, logConfig);
+        assertEquals(Optional.of(new LogOffsetMetadata(3L)), 
reopened.producerStateManager().firstUnstableOffset());
+
+        reopened.truncateFullyAndStartAt(2L, Optional.of(1L));
+        assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+        assertEquals(Map.of(), 
reopened.producerStateManager().activeProducers());
+        assertEquals(1L, reopened.logStartOffset());
+        assertEquals(2L, reopened.logEndOffset());
+    }
+
+    private void testTruncateBelowFirstUnstableOffset(BiConsumer<UnifiedLog, 
Long> truncateFunc) throws IOException {
+        // Verify that truncation below the first unstable offset correctly
+        // resets the producer state. Specifically we are testing the case when
+        // the segment position of the first unstable offset is unknown.
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long producerId = 17L;
+        short producerEpoch = 10;
+        int sequence = 0;
+
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("0".getBytes()),
+                new SimpleRecord("1".getBytes()),
+                new SimpleRecord("2".getBytes())
+        )), 0);
+        log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+                Compression.NONE,
+                producerId,
+                producerEpoch,
+                sequence,
+                new SimpleRecord("3".getBytes()),
+                new SimpleRecord("4".getBytes())
+        ), 0);
+        assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+        // We close and reopen the log to ensure that the first unstable 
offset segment
+        // position will be undefined when we truncate the log.
+        log.close();
+
+        UnifiedLog reopened = createLog(logDir, logConfig);
+        assertEquals(Optional.of(new LogOffsetMetadata(3L)), 
reopened.producerStateManager().firstUnstableOffset());
+
+        truncateFunc.accept(reopened, 0L);
+        assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+        assertEquals(Map.of(), 
reopened.producerStateManager().activeProducers());
+    }
+
+    @Test
+    public void testHighWatermarkMaintenance() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        int leaderEpoch = 0;
+
+        Function<Long, MemoryRecords> records = offset -> 
LogTestUtils.records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, 
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+        // High watermark initialized to 0
+        assertHighWatermark(log, 0L);
+
+        // High watermark not changed by append
+        log.appendAsLeader(records.apply(0L), leaderEpoch);
+        assertHighWatermark(log, 0L);
+
+        // Update high watermark as leader
+        log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L));
+        assertHighWatermark(log, 1L);
+
+        // Cannot update past the log end offset
+        log.updateHighWatermark(5L);
+        assertHighWatermark(log, 3L);
+
+        // Update high watermark as follower
+        log.appendAsFollower(records.apply(3L), leaderEpoch);
+        log.updateHighWatermark(6L);
+        assertHighWatermark(log, 6L);
+
+        // High watermark should be adjusted by truncation
+        log.truncateTo(3L);
+        assertHighWatermark(log, 3L);
+
+        log.appendAsLeader(records.apply(0L), 0);
+        assertHighWatermark(log, 3L);
+        assertEquals(6L, log.logEndOffset());
+        assertEquals(0L, log.logStartOffset());
+
+        // Full truncation should also reset high watermark
+        log.truncateFullyAndStartAt(4L, Optional.empty());
+        assertEquals(4L, log.logEndOffset());
+        assertEquals(4L, log.logStartOffset());
+        assertHighWatermark(log, 4L);
+    }
+
+    private void assertHighWatermark(UnifiedLog log, long offset) throws 
IOException {
+        assertEquals(offset, log.highWatermark());
+        assertValidLogOffsetMetadata(log, 
log.fetchOffsetSnapshot().highWatermark());
+    }
+
+    private void assertNonEmptyFetch(UnifiedLog log, long offset, 
FetchIsolation isolation, long batchBaseOffset) throws IOException {
+        FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE, 
isolation, true);
+
+        assertFalse(readInfo.firstEntryIncomplete);
+        assertTrue(readInfo.records.sizeInBytes() > 0);
+
+        long upperBoundOffset = switch (isolation) {
+            case LOG_END -> log.logEndOffset();
+            case HIGH_WATERMARK -> log.highWatermark();
+            case TXN_COMMITTED -> log.lastStableOffset();
+        };
+
+        for (Record record : readInfo.records.records())
+            assertTrue(record.offset() < upperBoundOffset);
+
+        assertEquals(batchBaseOffset, 
readInfo.fetchOffsetMetadata.messageOffset);
+        assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+    }
+
+    private void assertEmptyFetch(UnifiedLog log, long offset, FetchIsolation 
isolation, long batchBaseOffset) throws IOException {
+        FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE, 
isolation, true);
+        assertFalse(readInfo.firstEntryIncomplete);
+        assertEquals(0, readInfo.records.sizeInBytes());
+        assertEquals(batchBaseOffset, 
readInfo.fetchOffsetMetadata.messageOffset);
+        assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+    }
+
+    @Test
+    public void testFetchUpToLogEndOffset() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("0".getBytes()),
+                new SimpleRecord("1".getBytes()),
+                new SimpleRecord("2".getBytes())
+        )), 0);
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("3".getBytes()),
+                new SimpleRecord("4".getBytes())
+        )), 0);
+        TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+        for (long offset = log.logStartOffset(); offset < log.logEndOffset(); 
offset++) {
+            Long batchBaseOffset = batchBaseOffsets.floor(offset);
+            assertNotNull(batchBaseOffset);
+            assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END, 
batchBaseOffset);
+        }
+    }
+
+    @Test
+    public void testFetchUpToHighWatermark() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("0".getBytes()),
+                new SimpleRecord("1".getBytes()),
+                new SimpleRecord("2".getBytes())
+        )), 0);
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("3".getBytes()),
+                new SimpleRecord("4".getBytes())
+        )), 0);
+        TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+        assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+        log.updateHighWatermark(3L);
+        assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+        log.updateHighWatermark(5L);
+        assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+    }
+
+    private void assertHighWatermarkBoundedFetches(UnifiedLog log, 
TreeSet<Long> batchBaseOffsets) throws IOException {
+        for (long offset = log.logStartOffset(); offset < log.highWatermark(); 
offset++) {
+            Long batchBaseOffset = batchBaseOffsets.floor(offset);
+            assertNotNull(batchBaseOffset);
+            assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, 
batchBaseOffset);
+        }
+
+        for (long offset = log.highWatermark(); offset <= log.logEndOffset(); 
offset++) {
+            Long batchBaseOffset = batchBaseOffsets.floor(offset);
+            assertNotNull(batchBaseOffset);
+            assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, 
batchBaseOffset);
+        }
+    }
+
+    @Test
+    public void testActiveProducers() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        // Test transactional producer state (open transaction)
+        short producer1Epoch = 5;
+        long producerId1 = 1L;
+        LogTestUtils.appendTransactionalAsLeader(log, producerId1, 
producer1Epoch, mockTime).accept(5);
+        assertProducerState(
+                log,
+                producerId1,
+                producer1Epoch,
+                4,
+                Optional.of(0L),
+                Optional.empty()
+        );
+
+        // Test transactional producer state (closed transaction)
+        int coordinatorEpoch = 15;
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1, 
producer1Epoch, ControlRecordType.COMMIT,
+                mockTime.milliseconds(), coordinatorEpoch, 0, 
TransactionVersion.TV_0.featureLevel());
+        assertProducerState(
+                log,
+                producerId1,
+                producer1Epoch,
+                4,
+                Optional.empty(),
+                Optional.of(coordinatorEpoch)
+        );
+
+        // Test idempotent producer state
+        short producer2Epoch = 5;
+        long producerId2 = 2L;
+        LogTestUtils.appendIdempotentAsLeader(log, producerId2, 
producer2Epoch, mockTime, false).accept(3);
+        assertProducerState(
+                log,
+                producerId2,
+                producer2Epoch,
+                2,
+                Optional.empty(),
+                Optional.empty()
+        );
+    }
+
+    private void assertProducerState(
+            UnifiedLog log,
+            long producerId,
+            short producerEpoch,
+            int lastSequence,
+            Optional<Long> currentTxnStartOffset,
+            Optional<Integer> coordinatorEpoch
+    ) {
+        Optional<DescribeProducersResponseData.ProducerState> producerStateOpt 
= log.activeProducers().stream().filter(p -> p.producerId() == 
producerId).findFirst();
+        assertTrue(producerStateOpt.isPresent());
+
+        DescribeProducersResponseData.ProducerState producerState = 
producerStateOpt.get();
+        assertEquals(producerEpoch, producerState.producerEpoch());
+        assertEquals(lastSequence, producerState.lastSequence());
+        assertEquals(currentTxnStartOffset.orElse(-1L), 
producerState.currentTxnStartOffset());
+        assertEquals(coordinatorEpoch.orElse(-1), 
producerState.coordinatorEpoch());
+    }
+
+    @Test
+    public void testFetchUpToLastStableOffset() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        short epoch = 0;
+
+        long producerId1 = 1L;
+        long producerId2 = 2L;
+
+        Consumer<Integer> appendProducer1 = 
LogTestUtils.appendTransactionalAsLeader(log, producerId1, epoch, mockTime);
+        Consumer<Integer> appendProducer2 = 
LogTestUtils.appendTransactionalAsLeader(log, producerId2, epoch, mockTime);
+
+        appendProducer1.accept(5);
+        LogTestUtils.appendNonTransactionalAsLeader(log, 3);
+        appendProducer2.accept(2);
+        appendProducer1.accept(4);
+        LogTestUtils.appendNonTransactionalAsLeader(log, 2);
+        appendProducer1.accept(10);
+
+        TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 5L, 8L, 
10L, 14L, 16L, 26L, 27L, 28L));
+
+        assertLsoBoundedFetches(log, batchBaseOffsets);
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertLsoBoundedFetches(log, batchBaseOffsets);
+
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1, epoch, 
ControlRecordType.COMMIT, mockTime.milliseconds(),
+                0, 0, TransactionVersion.TV_0.featureLevel());
+        assertEquals(0L, log.lastStableOffset());
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(8L, log.lastStableOffset());
+        assertLsoBoundedFetches(log, batchBaseOffsets);
+
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId2, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+                0, 0, TransactionVersion.TV_0.featureLevel());
+        assertEquals(8L, log.lastStableOffset());
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(log.logEndOffset(), log.lastStableOffset());
+        assertLsoBoundedFetches(log, batchBaseOffsets);
+    }
+
+    private void assertLsoBoundedFetches(UnifiedLog log, TreeSet<Long> 
batchBaseOffsets) throws IOException {
+        for (long offset = log.logStartOffset(); offset < 
log.lastStableOffset(); offset++) {
+            Long batchBaseOffset = batchBaseOffsets.floor(offset);
+            assertNotNull(batchBaseOffset);
+            assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED, 
batchBaseOffset);
+        }
+
+        for (long offset = log.lastStableOffset(); offset <= 
log.logEndOffset(); offset++) {
+            Long batchBaseOffset = batchBaseOffsets.floor(offset);
+            assertNotNull(batchBaseOffset);
+            assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED, 
batchBaseOffset);
+        }
+    }
+
+    /**
+     * Tests for time based log roll. This test appends messages then changes 
the time
+     * using the mock clock to force the log to roll and checks the number of 
segments.
+     */
+    @Test
+    public void testTimeBasedLogRollDuringAppend() throws IOException {
+        Supplier<MemoryRecords> createRecords = () -> 
LogTestUtils.records(List.of(new SimpleRecord("test".getBytes())));
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentMs(ONE_HOUR).build();
+
+        // create a log
+        UnifiedLog log = createLog(logDir, logConfig, 0L, 0L, 
brokerTopicStats, mockTime.scheduler, mockTime,
+                new ProducerStateManagerConfig(24 * 60, false), true, 
Optional.empty(), false);
+        assertEquals(1, log.numberOfSegments(), "Log begins with a single 
empty segment.");
+        // Test the segment rolling behavior when messages do not have a 
timestamp.
+        mockTime.sleep(log.config().segmentMs + 1);
+        log.appendAsLeader(createRecords.get(), 0);
+        assertEquals(1, log.numberOfSegments(), "Log doesn't roll if doing so 
creates an empty segment.");
+
+        log.appendAsLeader(createRecords.get(), 0);
+        assertEquals(2, log.numberOfSegments(), "Log rolls on this append 
since time has expired.");
+
+        for (int numSegments = 3; numSegments < 5; numSegments++) {
+            mockTime.sleep(log.config().segmentMs + 1);
+            log.appendAsLeader(createRecords.get(), 0);
+            assertEquals(numSegments, log.numberOfSegments(), "Changing time 
beyond rollMs and appending should create a new segment.");
+        }
+
+        // Append a message with timestamp to a segment whose first message do 
not have a timestamp.
+        long timestamp = mockTime.milliseconds() + log.config().segmentMs + 1;
+        Supplier<MemoryRecords> recordWithTimestamp = () -> 
LogTestUtils.records(List.of(new SimpleRecord(timestamp, "test".getBytes())));
+        log.appendAsLeader(recordWithTimestamp.get(), 0);
+        assertEquals(4, log.numberOfSegments(), "Segment should not have been 
rolled out because the log rolling should be based on wall clock.");
+
+        // Test the segment rolling behavior when messages have timestamps.
+        mockTime.sleep(log.config().segmentMs + 1);
+        log.appendAsLeader(recordWithTimestamp.get(), 0);
+        assertEquals(5, log.numberOfSegments(), "A new segment should have 
been rolled out");
+
+        // move the wall clock beyond log rolling time
+        mockTime.sleep(log.config().segmentMs + 1);
+        log.appendAsLeader(recordWithTimestamp.get(), 0);
+        assertEquals(5, log.numberOfSegments(), "Log should not roll because 
the roll should depend on timestamp of the first message.");
+
+        Supplier<MemoryRecords> recordWithExpiredTimestamp = () -> 
LogTestUtils.records(List.of(new SimpleRecord(mockTime.milliseconds(), 
"test".getBytes())));
+        log.appendAsLeader(recordWithExpiredTimestamp.get(), 0);
+        assertEquals(6, log.numberOfSegments(), "Log should roll because the 
timestamp in the message should make the log segment expire.");
+
+        int numSegments = log.numberOfSegments();
+        mockTime.sleep(log.config().segmentMs + 1);
+        log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE), 0);
+        assertEquals(numSegments, log.numberOfSegments(), "Appending an empty 
message set should not roll log even if sufficient time has passed.");
+    }
+
+    @Test
+    public void testRollSegmentThatAlreadyExists() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentMs(ONE_HOUR).build();
+        int partitionLeaderEpoch = 0;
+
+        // create a log
+        UnifiedLog log = createLog(logDir, logConfig);
+        assertEquals(1, log.numberOfSegments(), "Log begins with a single 
empty segment.");
+
+        // roll active segment with the same base offset of size zero should 
recreate the segment
+        log.roll(Optional.of(0L));
+        assertEquals(1, log.numberOfSegments(), "Expect 1 segment after roll() 
empty segment with base offset.");
+
+        // should be able to append records to active segment
+        MemoryRecords records = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"k1".getBytes(), "v1".getBytes())),
+                0L, partitionLeaderEpoch);
+        log.appendAsFollower(records, partitionLeaderEpoch);
+        assertEquals(1, log.numberOfSegments(), "Expect one segment.");
+        assertEquals(0L, log.activeSegment().baseOffset());
+
+        // make sure we can append more records
+        MemoryRecords records2 = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds() + 10, 
"k2".getBytes(), "v2".getBytes())),
+                1L, partitionLeaderEpoch);
+        log.appendAsFollower(records2, partitionLeaderEpoch);
+
+        assertEquals(2, log.logEndOffset(), "Expect two records in the log");
+        assertEquals(0, log.read(0, 1, FetchIsolation.LOG_END, 
true).records.batches().iterator().next().lastOffset());
+        assertEquals(1, log.read(1, 1, FetchIsolation.LOG_END, 
true).records.batches().iterator().next().lastOffset());
+
+        // roll so that active segment is empty
+        log.roll();
+        assertEquals(2L, log.activeSegment().baseOffset(), "Expect base offset 
of active segment to be LEO");
+        assertEquals(2, log.numberOfSegments(), "Expect two segments.");
+
+        // manually resize offset index to force roll of an empty active 
segment on next append
+        log.activeSegment().offsetIndex().resize(0);
+        MemoryRecords records3 = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds() + 12, 
"k3".getBytes(), "v3".getBytes())),
+                2L, partitionLeaderEpoch);
+        log.appendAsFollower(records3, partitionLeaderEpoch);
+        assertTrue(log.activeSegment().offsetIndex().maxEntries() > 1);
+        assertEquals(2, log.read(2, 1, FetchIsolation.LOG_END, 
true).records.batches().iterator().next().lastOffset());
+        assertEquals(2, log.numberOfSegments(), "Expect two segments.");
+    }
+
+    @Test
+    public void testNonSequentialAppend() throws IOException {
+        // create a log
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        long pid = 1L;
+        short epoch = 0;
+
+        MemoryRecords records = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
+                pid, epoch, 0, 0L);
+        log.appendAsLeader(records, 0);
+
+        MemoryRecords nextRecords = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
+                pid, epoch, 2, 0L);
+        assertThrows(OutOfOrderSequenceException.class, () -> 
log.appendAsLeader(nextRecords, 0));
+    }
+
+    @Test
+    public void testTruncateToEndOffsetClearsEpochCache() throws IOException {
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+
+        // Seed some initial data in the log
+        MemoryRecords records = LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())),
+                27, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+        appendAsFollower(log, records, 19);
+        assertEquals(Optional.of(new EpochEntry(19, 27)), 
log.leaderEpochCache().latestEntry());
+        assertEquals(29, log.logEndOffset());
+
+        // Truncations greater than or equal to the log end offset should
+        // clear the epoch cache
+        verifyTruncationClearsEpochCache(log, 20, log.logEndOffset());
+        verifyTruncationClearsEpochCache(log, 24, log.logEndOffset() + 1);
+    }
+
+    private void verifyTruncationClearsEpochCache(UnifiedLog log, int epoch, 
long truncationOffset) {
+        // Simulate becoming a leader
+        log.assignEpochStartOffset(epoch, log.logEndOffset());
+        assertEquals(Optional.of(new EpochEntry(epoch, 29)), 
log.leaderEpochCache().latestEntry());
+        assertEquals(29, log.logEndOffset());
+
+        // Now we become the follower and truncate to an offset greater
+        // than or equal to the log end offset. The trivial epoch entry
+        // at the end of the log should be gone
+        log.truncateTo(truncationOffset);
+        assertEquals(Optional.of(new EpochEntry(19, 27)), 
log.leaderEpochCache().latestEntry());
+        assertEquals(29, log.logEndOffset());
+    }
+
+    /**
+     * Test the values returned by the logSegments call
+     */
+    @Test
+    public void testLogSegmentsCallCorrect() throws IOException {
+        // Create 3 segments and make sure we get the right values from 
various logSegments calls.
+        Supplier<MemoryRecords> createRecords = () -> 
LogTestUtils.records(List.of(new SimpleRecord("test".getBytes())), 
mockTime.milliseconds());
+
+        int setSize = createRecords.get().sizeInBytes();
+        int msgPerSeg = 10;
+        int segmentSize = msgPerSeg * setSize;  // each segment will be 10 
messages
+        // create a log
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(segmentSize).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+
+        // segments expire in size
+        for (int i = 1; i <= (2 * msgPerSeg + 2); i++) {
+            log.appendAsLeader(createRecords.get(), 0);
+        }
+        assertEquals(3, log.numberOfSegments(), "There should be exactly 3 
segments.");
+
+        // from == to should always be null
+        assertEquals(List.of(), getSegmentOffsets(log, 10, 10));
+        assertEquals(List.of(), getSegmentOffsets(log, 15, 15));
+
+        assertEquals(List.of(0L, 10L, 20L), getSegmentOffsets(log, 0, 21));
+
+        assertEquals(List.of(0L), getSegmentOffsets(log, 1, 5));
+        assertEquals(List.of(10L, 20L), getSegmentOffsets(log, 13, 21));
+        assertEquals(List.of(10L), getSegmentOffsets(log, 13, 17));
+
+        // from > to is bad
+        assertThrows(IllegalArgumentException.class, () -> log.logSegments(10, 
0));
+    }
+
+    private List<Long> getSegmentOffsets(UnifiedLog log, long from, long to) {
+        return log.logSegments(from, 
to).stream().map(LogSegment::baseOffset).toList();
+    }
+
+    @Test
+    public void testInitializationOfProducerSnapshotsUpgradePath() throws 
IOException {
+        // simulate the upgrade path by creating a new log with several 
segments, deleting the
+        // snapshot files, and then reloading the log
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(64 * 10).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        assertEquals(OptionalLong.empty(), log.oldestProducerSnapshotOffset());
+
+        for (int i = 0; i <= 100; i++) {
+            SimpleRecord record = new SimpleRecord(mockTime.milliseconds(), 
String.valueOf(i).getBytes());
+            log.appendAsLeader(LogTestUtils.records(List.of(record)), 0);
+        }
+        assertTrue(log.logSegments().size() >= 2);
+        long logEndOffset = log.logEndOffset();
+        log.close();
+
+        LogTestUtils.deleteProducerSnapshotFiles(logDir);
+
+        // Reload after clean shutdown
+        log = createLog(logDir, logConfig, 0L, logEndOffset, brokerTopicStats, 
mockTime.scheduler, mockTime,
+                producerStateManagerConfig, true, Optional.empty(), false);
+        List<Long> segmentOffsets = log.logSegments().stream()
+                .map(LogSegment::baseOffset)
+                .toList();
+        int size = segmentOffsets.size();
+        List<Long> expectedSnapshotOffsets = new ArrayList<>(size >= 2 ? 
segmentOffsets.subList(size - 2, size) : segmentOffsets);
+        expectedSnapshotOffsets.add(log.logEndOffset());
+        assertEquals(expectedSnapshotOffsets, 
LogTestUtils.listProducerSnapshotOffsets(logDir));
+        log.close();
+
+        LogTestUtils.deleteProducerSnapshotFiles(logDir);
+
+        // Reload after unclean shutdown with recoveryPoint set to log end 
offset
+        log = createLog(logDir, logConfig, 0L, logEndOffset, brokerTopicStats, 
mockTime.scheduler, mockTime,
+                producerStateManagerConfig, false, Optional.empty(), false);
+        assertEquals(expectedSnapshotOffsets, 
LogTestUtils.listProducerSnapshotOffsets(logDir));
+        log.close();
+
+        LogTestUtils.deleteProducerSnapshotFiles(logDir);
+
+        // Reload after unclean shutdown with recoveryPoint set to 0
+        log = createLog(logDir, logConfig, 0L, 0L, brokerTopicStats, 
mockTime.scheduler, mockTime,
+                producerStateManagerConfig, false, Optional.empty(), false);
+        // We progressively create a snapshot for each segment after the 
recovery point
+        segmentOffsets = log.logSegments().stream()
+                .map(LogSegment::baseOffset)
+                .toList();
+        expectedSnapshotOffsets = new ArrayList<>(segmentOffsets.subList(1, 
segmentOffsets.size()));
+        expectedSnapshotOffsets.add(log.logEndOffset());
+        assertEquals(expectedSnapshotOffsets, 
LogTestUtils.listProducerSnapshotOffsets(logDir));
+        log.close();
+    }
+
+    @Test
+    public void testLogReinitializeAfterManualDelete() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+        // simulate a case where log data does not exist but the start offset 
is non-zero
+        UnifiedLog log = createLog(logDir, logConfig, 500L, 0L, 
brokerTopicStats, mockTime.scheduler, mockTime,
+                producerStateManagerConfig, true, Optional.empty(), false);
+        assertEquals(500, log.logStartOffset());
+        assertEquals(500, log.logEndOffset());
+    }
+
+    /**
+     * Test that "PeriodicProducerExpirationCheck" scheduled task gets 
canceled after log
+     * is deleted.
+     */
+    @Test
+    public void testProducerExpireCheckAfterDelete() throws Exception {
+        KafkaScheduler scheduler = new KafkaScheduler(1);
+        try {
+            scheduler.startup();
+            LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+            UnifiedLog log = createLog(logDir, logConfig, 0L, 0L, 
brokerTopicStats, scheduler, mockTime,
+                    producerStateManagerConfig, true, Optional.empty(), false);
+
+            ScheduledFuture<?> producerExpireCheck = log.producerExpireCheck();
+            assertTrue(scheduler.taskRunning(producerExpireCheck), 
"producerExpireCheck isn't as part of scheduled tasks");
+
+            log.delete();
+            assertFalse(scheduler.taskRunning(producerExpireCheck),
+                    "producerExpireCheck is part of scheduled tasks even after 
log deletion");
+        } finally {
+            scheduler.shutdown();
+        }
+    }
+
+    @Test
+    public void testProducerIdMapOffsetUpdatedForNonIdempotentData() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        MemoryRecords records = LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "key".getBytes(), "value".getBytes())));
+        log.appendAsLeader(records, 0);
+        log.takeProducerSnapshot();
+        assertEquals(OptionalLong.of(1), log.latestProducerSnapshotOffset());
+    }
+
+    @Test
+    public void testRebuildProducerIdMapWithCompactedData() throws IOException 
{
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid = 1L;
+        short producerEpoch = 0;
+        int partitionLeaderEpoch = 0;
+        int seq = 0;
+        long baseOffset = 23L;
+
+        // create a batch with a couple gaps to simulate compaction
+        MemoryRecords records = LogTestUtils.records(
+                List.of(
+                        new SimpleRecord(mockTime.milliseconds(), 
"a".getBytes()),
+                        new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "b".getBytes()),
+                        new SimpleRecord(mockTime.milliseconds(), 
"c".getBytes()),
+                        new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "d".getBytes())
+                ),
+                pid, producerEpoch, seq, baseOffset
+        );
+        records.batches().forEach(b -> 
b.setPartitionLeaderEpoch(partitionLeaderEpoch));
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
+            @Override
+            public MemoryRecords.RecordFilter.BatchRetentionResult 
checkBatchRetention(RecordBatch batch) {
+                return new 
MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY,
 false);
+            }
+            @Override 
+            public boolean shouldRetainRecord(RecordBatch recordBatch, Record 
record) {
+                return !record.hasKey();
+            }
+        }, filtered, BufferSupplier.NO_CACHING);
+        filtered.flip();
+        MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
+
+        log.appendAsFollower(filteredRecords, partitionLeaderEpoch);
+
+        // append some more data and then truncate to force rebuilding of the 
PID map
+        MemoryRecords moreRecords = LogTestUtils.records(
+                List.of(
+                        new SimpleRecord(mockTime.milliseconds(), 
"e".getBytes()),
+                        new SimpleRecord(mockTime.milliseconds(), 
"f".getBytes())),
+                baseOffset + 4, RecordBatch.NO_PARTITION_LEADER_EPOCH
+        );
+        appendAsFollower(log, moreRecords, partitionLeaderEpoch);
+
+        log.truncateTo(baseOffset + 4);
+
+        Map<Long, Integer> activeProducers = 
log.activeProducersWithLastSequence();
+        assertTrue(activeProducers.containsKey(pid));
+
+        int lastSeq = activeProducers.get(pid);
+        assertEquals(3, lastSeq);
+    }
+
+    @Test
+    public void testRebuildProducerStateWithEmptyCompactedBatch() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid = 1L;
+        short producerEpoch = 0;
+        int partitionLeaderEpoch = 0;
+        int seq = 0;
+        long baseOffset = 23L;
+
+        // create an empty batch
+        MemoryRecords records = LogTestUtils.records(
+                List.of(
+                        new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "a".getBytes()),
+                        new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "b".getBytes())),
+                pid, producerEpoch, seq, baseOffset
+        );
+        records.batches().forEach(b -> 
b.setPartitionLeaderEpoch(partitionLeaderEpoch));
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
+            @Override 
+            public MemoryRecords.RecordFilter.BatchRetentionResult 
checkBatchRetention(RecordBatch batch) {
+                return new 
MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.RETAIN_EMPTY,
 true);
+            }
+            @Override public boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
+                return false;
+            }
+        }, filtered, BufferSupplier.NO_CACHING);
+        filtered.flip();
+        MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
+
+        log.appendAsFollower(filteredRecords, partitionLeaderEpoch);
+
+        // append some more data and then truncate to force rebuilding of the 
PID map
+        MemoryRecords moreRecords = LogTestUtils.records(
+                List.of(
+                        new SimpleRecord(mockTime.milliseconds(), 
"e".getBytes()),
+                        new SimpleRecord(mockTime.milliseconds(), 
"f".getBytes())),
+                baseOffset + 2, RecordBatch.NO_PARTITION_LEADER_EPOCH
+        );
+        appendAsFollower(log, moreRecords, partitionLeaderEpoch);
+
+        log.truncateTo(baseOffset + 2);
+
+        Map<Long, Integer> activeProducers = 
log.activeProducersWithLastSequence();
+        assertTrue(activeProducers.containsKey(pid));
+
+        int lastSeq = activeProducers.get(pid);
+        assertEquals(1, lastSeq);
+    }
+
+    @Test
+    public void testUpdateProducerIdMapWithCompactedData() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid = 1L;
+        short producerEpoch = 0;
+        int partitionLeaderEpoch = 0;
+        int seq = 0;
+        long baseOffset = 23L;
+
+        // create a batch with a couple gaps to simulate compaction
+        MemoryRecords records = LogTestUtils.records(
+                List.of(
+                        new SimpleRecord(mockTime.milliseconds(), 
"a".getBytes()),
+                        new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "b".getBytes()),
+                        new SimpleRecord(mockTime.milliseconds(), 
"c".getBytes()),
+                        new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "d".getBytes())),
+                pid, producerEpoch, seq, baseOffset
+        );
+        records.batches().forEach(b -> 
b.setPartitionLeaderEpoch(partitionLeaderEpoch));
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
+            @Override public MemoryRecords.RecordFilter.BatchRetentionResult 
checkBatchRetention(RecordBatch batch) {
+                return new 
MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY,
 false);
+            }
+            @Override public boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
+                return !record.hasKey();
+            }
+        }, filtered, BufferSupplier.NO_CACHING);
+        filtered.flip();
+        MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
+
+        log.appendAsFollower(filteredRecords, partitionLeaderEpoch);
+        Map<Long, Integer> activeProducers = 
log.activeProducersWithLastSequence();
+        assertTrue(activeProducers.containsKey(pid));
+
+        int lastSeq = activeProducers.get(pid);
+        assertEquals(3, lastSeq);
+    }
+
+    @Test
+    public void testProducerIdMapTruncateTo() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes()))), 0);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("b".getBytes()))), 0);
+        log.takeProducerSnapshot();
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("c".getBytes()))), 0);
+        log.takeProducerSnapshot();
+
+        log.truncateTo(2);
+        assertEquals(OptionalLong.of(2), log.latestProducerSnapshotOffset());
+        assertEquals(2, log.latestProducerStateEndOffset());
+
+        log.truncateTo(1);
+        assertEquals(OptionalLong.of(1), log.latestProducerSnapshotOffset());
+        assertEquals(1, log.latestProducerStateEndOffset());
+
+        log.truncateTo(0);
+        assertEquals(OptionalLong.empty(), log.latestProducerSnapshotOffset());
+        assertEquals(0, log.latestProducerStateEndOffset());
+    }
+
+    @Test
+    public void testProducerIdMapTruncateToWithNoSnapshots() throws 
IOException {
+        // This ensures that the upgrade optimization path cannot be hit after 
initial loading
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid = 1L;
+        short epoch = 0;
+
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord("a".getBytes())),
+                pid, epoch, 0, 0L), 0);
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord("b".getBytes())),
+                pid, epoch, 1, 0L), 0);
+
+        LogTestUtils.deleteProducerSnapshotFiles(logDir);
+
+        log.truncateTo(1L);
+        assertEquals(1, log.activeProducersWithLastSequence().size());
+
+        int lastSeq = log.activeProducersWithLastSequence().get(pid);
+        assertEquals(0, lastSeq);
+    }
+
+    @ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with 
createEmptyActiveSegment: {0}")
+    @ValueSource(booleans = {true, false})
+    public void testRetentionDeletesProducerStateSnapshots(boolean 
createEmptyActiveSegment) throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .retentionBytes(0)
+                .retentionMs(1000 * 60)
+                .fileDeleteDelayMs(0)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid1 = 1L;
+        short epoch = 0;
+
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord("a".getBytes())),
+                pid1, epoch, 0, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord("b".getBytes())),
+                pid1, epoch, 1, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord("c".getBytes())),
+                pid1, epoch, 2, 0L), 0);
+        if (createEmptyActiveSegment) {
+            log.roll();
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+
+        int numProducerSnapshots = createEmptyActiveSegment ? 3 : 2;
+        assertEquals(numProducerSnapshots, 
ProducerStateManager.listSnapshotFiles(logDir).size());
+        // Sleep to breach the retention period
+        mockTime.sleep(1000 * 60 + 1);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        // Sleep to breach the file delete delay and run scheduled file 
deletion tasks
+        mockTime.sleep(1);
+        assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size(),
+                "expect a single producer state snapshot remaining");
+        assertEquals(3, log.logStartOffset());
+    }
+
+    private void assertValidLogOffsetMetadata(UnifiedLog log, 
LogOffsetMetadata offsetMetadata) throws IOException {
+        assertFalse(offsetMetadata.messageOffsetOnly());
+
+        long segmentBaseOffset = offsetMetadata.segmentBaseOffset;
+        List<LogSegment> segments = log.logSegments(segmentBaseOffset, 
segmentBaseOffset + 1);
+        assertFalse(segments.isEmpty());
+
+        LogSegment segment = segments.iterator().next();
+        assertEquals(segmentBaseOffset, segment.baseOffset());
+        assertTrue(offsetMetadata.relativePositionInSegment <= segment.size());
+
+        FetchDataInfo readInfo = segment.read(offsetMetadata.messageOffset,
+                2048,
+                Optional.of((long) segment.size()),
+                false);
+
+        if (offsetMetadata.relativePositionInSegment < segment.size()) {
+            assertEquals(offsetMetadata, readInfo.fetchOffsetMetadata);
+        } else {
+            assertNull(readInfo);
+        }
+    }
+
     private void append(int epoch, long startOffset, int count) {
         Function<Integer, MemoryRecords> records = i ->
                 records(List.of(new SimpleRecord("value".getBytes())), 
startOffset + i, epoch);
@@ -630,6 +1632,11 @@ public class UnifiedLogTest {
         }
     }
 
+    private void appendAsFollower(UnifiedLog log, MemoryRecords records, int 
leaderEpoch) {
+        records.batches().forEach(b -> b.setPartitionLeaderEpoch(leaderEpoch));
+        log.appendAsFollower(records, leaderEpoch);
+    }
+
     private LeaderEpochFileCache epochCache(UnifiedLog log) {
         return log.leaderEpochCache();
     }
@@ -639,25 +1646,28 @@ public class UnifiedLogTest {
     }
 
     private UnifiedLog createLog(File dir, LogConfig config, boolean 
remoteStorageSystemEnable) throws IOException {
-        return createLog(dir, config, this.brokerTopicStats, 
mockTime.scheduler, this.mockTime,
-                this.producerStateManagerConfig, Optional.empty(), 
remoteStorageSystemEnable);
+        return createLog(dir, config, 0L, 0L, brokerTopicStats, 
mockTime.scheduler, mockTime,
+                producerStateManagerConfig, true, Optional.empty(), 
remoteStorageSystemEnable);
     }
 
     private UnifiedLog createLog(
             File dir,
             LogConfig config,
+            long logStartOffset,
+            long recoveryPoint,
             BrokerTopicStats brokerTopicStats,
             Scheduler scheduler,
             MockTime time,
             ProducerStateManagerConfig producerStateManagerConfig,
+            boolean lastShutdownClean,
             Optional<Uuid> topicId,
             boolean remoteStorageSystemEnable) throws IOException {
 
         UnifiedLog log = UnifiedLog.create(
                 dir,
                 config,
-                0L,
-                0L,
+                logStartOffset,
+                recoveryPoint,
                 scheduler,
                 brokerTopicStats,
                 time,
@@ -665,14 +1675,14 @@ public class UnifiedLogTest {
                 producerStateManagerConfig,
                 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
                 new LogDirFailureChannel(10),
-                true,
+                lastShutdownClean,
                 topicId,
                 new ConcurrentHashMap<>(),
                 remoteStorageSystemEnable,
                 LogOffsetsListener.NO_OP_OFFSETS_LISTENER
         );
 
-        this.logsToClose.add(log);
+        logsToClose.add(log);
         return log;
     }
 

Reply via email to