This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3a950f45ea5 KAFKA-19752 Move UnifiedLogTest to storage module (1/N)
(#20635)
3a950f45ea5 is described below
commit 3a950f45ea5ecfb81e4aa7de84e388ea6c68b38a
Author: TaiJuWu <[email protected]>
AuthorDate: Wed Dec 24 18:26:51 2025 +0800
KAFKA-19752 Move UnifiedLogTest to storage module (1/N) (#20635)
This PR migrates several unit tests from UnifiedLogTest.scala to
UnifiedLogTest.java as part of the ongoing effort to convert core broker
logic and tests to Java.
The following test methods have been rewritten and moved:
- testDeleteOldSegments
- testLogDeletionAfterClose
- testLogDeletionAfterDeleteRecords
- shouldDeleteSizeBasedSegments
- shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize
- shouldDeleteTimeBasedSegmentsReadyToBeDeleted
- shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted
- shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete
- shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete
- shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention
- shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention
- shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete
- shouldApplyEpochToMessageOnAppendIfLeader
- followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache
- shouldTruncateLeaderEpochsWhenDeletingSegments
- shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments
- shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLogt
- testFirstUnstableOffsetNoTransactionalData
- testFirstUnstableOffsetWithTransactionalData
New Utility Methods: To support the Java tests, new utility methods,
including LogConfigBuilder and transactional append helpers, were added
to LogTestUtils.java.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 519 +--------------
.../kafka/storage/internals/log/LogTestUtils.java | 122 ++++
.../storage/internals/log/UnifiedLogTest.java | 713 +++++++++++++++++++++
.../org/apache/kafka/common/test/TestUtils.java | 1 +
4 files changed, 862 insertions(+), 493 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index d38e65e29f1..eb54319eb94 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -52,7 +52,6 @@ import org.junit.jupiter.params.provider.{EnumSource,
ValueSource}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{doAnswer, doThrow, spy}
-
import net.jqwik.api.AfterFailureMode
import net.jqwik.api.ForAll
import net.jqwik.api.Property
@@ -1016,7 +1015,7 @@ class UnifiedLogTest {
assertEquals(numProducerSnapshots,
ProducerStateManager.listSnapshotFiles(logDir).size)
// Sleep to breach the retention period
mockTime.sleep(1000 * 60 + 1)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
// Sleep to breach the file delete delay and run scheduled file deletion
tasks
mockTime.sleep(1)
assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
@@ -1065,7 +1064,7 @@ class UnifiedLogTest {
// Increment the log start offset to exclude the first two segments.
log.maybeIncrementLogStartOffset(log.logEndOffset - 1,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
// Sleep to breach the file delete delay and run scheduled file deletion
tasks
mockTime.sleep(1)
assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
@@ -1103,7 +1102,8 @@ class UnifiedLogTest {
// Clean segments, this should delete everything except the active segment
since there only
// exists the key "a".
cleaner.clean(new LogToClean(log, 0, log.logEndOffset, false))
- log.deleteOldSegments()
+ // There is no other key so we don't delete anything
+ assertEquals(0, log.deleteOldSegments())
// Sleep to breach the file delete delay and run scheduled file deletion
tasks
mockTime.sleep(1)
assertEquals(log.logSegments.asScala.map(_.baseOffset).toSeq.sorted.drop(1),
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted,
@@ -1166,7 +1166,7 @@ class UnifiedLogTest {
assertEquals(util.Set.of(pid1, pid2),
log.activeProducersWithLastSequence.keySet)
log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
// Producer state should not be removed when deleting log segment
assertEquals(2, log.logSegments.size)
@@ -1547,7 +1547,7 @@ class UnifiedLogTest {
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(2L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- log.deleteOldSegments() // force retention to kick in so that the snapshot
files are cleaned up.
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")// force retention to kick in so that the snapshot files are cleaned
up.
mockTime.sleep(logConfig.fileDeleteDelayMs + 1000) // advance the clock so
file deletion takes place
// Deleting records should not remove producer state but should delete
snapshots after the file deletion delay.
@@ -2723,7 +2723,7 @@ class UnifiedLogTest {
val oldFiles = segments.map(_.log.file) ++ segments.map(_.offsetIndexFile)
log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertEquals(1, log.numberOfSegments, "Only one segment should remain.")
assertTrue(segments.forall(_.log.file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
&&
@@ -3015,476 +3015,8 @@ class UnifiedLogTest {
assertFalse(LogTestUtils.hasOffsetOverflow(log))
}
- @Test
- def testDeleteOldSegments(): Unit = {
- def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds - 1000)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 100)
- log.appendAsLeader(createRecords, 0)
-
- log.assignEpochStartOffset(0, 40)
- log.assignEpochStartOffset(1, 90)
-
- // segments are not eligible for deletion if no high watermark has been set
- val numSegments = log.numberOfSegments
- log.deleteOldSegments()
- assertEquals(numSegments, log.numberOfSegments)
- assertEquals(0L, log.logStartOffset)
-
- // only segments with offset before the current high watermark are
eligible for deletion
- for (hw <- 25 to 30) {
- log.updateHighWatermark(hw)
- log.deleteOldSegments()
- assertTrue(log.logStartOffset <= hw)
- log.logSegments.forEach { segment =>
- val segmentFetchInfo = segment.read(segment.baseOffset, Int.MaxValue)
- val segmentLastOffsetOpt =
segmentFetchInfo.records.records.asScala.lastOption.map(_.offset)
- segmentLastOffsetOpt.foreach { lastOffset =>
- assertTrue(lastOffset >= hw)
- }
- }
- }
-
- // expire all segments
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(1, log.numberOfSegments, "The deleted segments should be
gone.")
- assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should
have gone.")
- assertEquals(new EpochEntry(1, 100), epochCache(log).epochEntries.get(0),
"Epoch entry should be the latest epoch and the leo.")
-
- // append some messages to create some segments
- for (_ <- 0 until 100)
- log.appendAsLeader(createRecords, 0)
-
- log.delete()
- assertEquals(0, log.numberOfSegments, "The number of segments should be 0")
- assertEquals(0, log.deleteOldSegments(), "The number of deleted segments
should be zero.")
- assertEquals(0, epochCache(log).epochEntries.size, "Epoch entries should
have gone.")
- }
-
- @Test
- def testLogDeletionAfterClose(): Unit = {
- def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds - 1000)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- log.appendAsLeader(createRecords, 0)
-
- assertEquals(1, log.numberOfSegments, "The deleted segments should be
gone.")
- assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should
have gone.")
-
- log.close()
- log.delete()
- assertEquals(0, log.numberOfSegments, "The number of segments should be 0")
- assertEquals(0, epochCache(log).epochEntries.size, "Epoch entries should
have gone.")
- }
-
- @Test
- def testLogDeletionAfterDeleteRecords(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5)
- val log = createLog(logDir, logConfig)
-
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
- assertEquals(3, log.numberOfSegments, "should have 3 segments")
- assertEquals(log.logStartOffset, 0)
- log.updateHighWatermark(log.logEndOffset)
-
- log.maybeIncrementLogStartOffset(1,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- log.deleteOldSegments()
- assertEquals(3, log.numberOfSegments, "should have 3 segments")
- assertEquals(log.logStartOffset, 1)
-
- log.maybeIncrementLogStartOffset(6,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- log.deleteOldSegments()
- assertEquals(2, log.numberOfSegments, "should have 2 segments")
- assertEquals(log.logStartOffset, 6)
-
- log.maybeIncrementLogStartOffset(15,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- log.deleteOldSegments()
- assertEquals(1, log.numberOfSegments, "should have 1 segments")
- assertEquals(log.logStartOffset, 15)
- }
-
def epochCache(log: UnifiedLog): LeaderEpochFileCache = log.leaderEpochCache
- @Test
- def shouldDeleteSizeBasedSegments(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(2,log.numberOfSegments, "should have 2 segments")
- }
-
- @Test
- def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(3,log.numberOfSegments, "should have 3 segments")
- }
-
- @Test
- def shouldDeleteTimeBasedSegmentsReadyToBeDeleted(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp
= 10)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(1, log.numberOfSegments, "There should be 1 segment
remaining")
- }
-
- @Test
- def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp
= mockTime.milliseconds)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000000)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(3, log.numberOfSegments, "There should be 3 segments
remaining")
- }
-
- @Test
- def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = 10L)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- // mark the oldest segment as older the retention.ms
- log.logSegments.asScala.head.setLastModified(mockTime.milliseconds - 20000)
-
- val segments = log.numberOfSegments
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(segments, log.numberOfSegments, "There should be 3 segments
remaining")
- }
-
- @Test
- def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention(): Unit =
{
- def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = 10L)
- val recordSize = createRecords.sizeInBytes
- val logConfig = LogTestUtils.createLogConfig(
- segmentBytes = recordSize * 2,
- localRetentionBytes = recordSize / 2,
- cleanupPolicy = "",
- remoteLogStorageEnable = true
- )
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
- for (_ <- 0 until 10)
- log.appendAsLeader(createRecords, 0)
-
- val segmentsBefore = log.numberOfSegments
- log.updateHighWatermark(log.logEndOffset)
- log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1)
- val deleteOldSegments = log.deleteOldSegments()
-
- assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be
deleted due to size retention")
- assertTrue(deleteOldSegments > 0, "At least one segment should be deleted")
- }
-
- @Test
- def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention(): Unit = {
- val oldTimestamp = mockTime.milliseconds - 20000
- def oldRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = oldTimestamp)
- val recordSize = oldRecords.sizeInBytes
- val logConfig = LogTestUtils.createLogConfig(
- segmentBytes = recordSize * 2,
- localRetentionMs = 5000,
- cleanupPolicy = "",
- remoteLogStorageEnable = true
- )
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
- for (_ <- 0 until 10)
- log.appendAsLeader(oldRecords, 0)
-
- def newRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = mockTime.milliseconds)
- for (_ <- 0 until 5)
- log.appendAsLeader(newRecords, 0)
-
- val segmentsBefore = log.numberOfSegments
-
- log.updateHighWatermark(log.logEndOffset)
- log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1)
- val deleteOldSegments = log.deleteOldSegments()
-
- assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be
deleted due to time retention")
- assertTrue(deleteOldSegments > 0, "At least one segment should be deleted")
- }
-
- @Test
- def
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete(): Unit
= {
- def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes, timestamp = 10L)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy =
"compact,delete")
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(1, log.numberOfSegments, "There should be 1 segment
remaining")
- }
-
- @Test
- def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete():
Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes, timestamp = 10L)
- val recordsPerSegment = 5
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000,
cleanupPolicy = "compact")
- val log = createLog(logDir, logConfig, brokerTopicStats)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- // Three segments should be created
- assertEquals(3, log.logSegments.asScala.count(_ => true))
- log.updateHighWatermark(log.logEndOffset)
- log.maybeIncrementLogStartOffset(recordsPerSegment,
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
- // The first segment, which is entirely before the log start offset,
should be deleted
- // Of the remaining the segments, the first can overlap the log start
offset and the rest must have a base offset
- // greater than the start offset
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(2, log.numberOfSegments, "There should be 2 segments
remaining")
- assertTrue(log.logSegments.asScala.head.baseOffset <= log.logStartOffset)
- assertTrue(log.logSegments.asScala.tail.forall(s => s.baseOffset >
log.logStartOffset))
- }
-
- @Test
- def shouldApplyEpochToMessageOnAppendIfLeader(): Unit = {
- val records = (0 until 50).toArray.map(id => new
SimpleRecord(id.toString.getBytes))
-
- //Given this partition is on leader epoch 72
- val epoch = 72
- val log = createLog(logDir, new LogConfig(new Properties))
- log.assignEpochStartOffset(epoch, records.length)
-
- //When appending messages as a leader (i.e. assignOffsets = true)
- for (record <- records)
- log.appendAsLeader(
- MemoryRecords.withRecords(Compression.NONE, record),
- epoch
- )
-
- //Then leader epoch should be set on messages
- for (i <- records.indices) {
- val read = LogTestUtils.readLog(log, i,
1).records.batches.iterator.next()
- assertEquals(72, read.partitionLeaderEpoch, "Should have set leader
epoch")
- }
- }
-
- @Test
- def
followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache(): Unit
= {
- val messageIds = (0 until 50).toArray
- val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
-
- //Given each message has an offset & epoch, as msgs from leader would
- def recordsForEpoch(i: Int): MemoryRecords = {
- val recs = MemoryRecords.withRecords(messageIds(i), Compression.NONE,
records(i))
- recs.batches.forEach{record =>
- record.setPartitionLeaderEpoch(42)
- record.setLastOffset(i)
- }
- recs
- }
-
- val log = createLog(logDir, new LogConfig(new Properties))
-
- //When appending as follower (assignOffsets = false)
- for (i <- records.indices)
- log.appendAsFollower(recordsForEpoch(i), i)
-
- assertEquals(Optional.of(42), log.latestEpoch)
- }
-
- @Test
- def shouldTruncateLeaderEpochsWhenDeletingSegments(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
- val log = createLog(logDir, logConfig)
- val cache = epochCache(log)
-
- // Given three segments of 5 messages each
- for (_ <- 0 until 15) {
- log.appendAsLeader(createRecords, 0)
- }
-
- //Given epochs
- cache.assign(0, 0)
- cache.assign(1, 5)
- cache.assign(2, 10)
-
- //When first segment is removed
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
-
- //The oldest epoch entry should have been removed
- assertEquals(util.List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)),
cache.epochEntries)
- }
-
- @Test
- def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
- val log = createLog(logDir, logConfig)
- val cache = epochCache(log)
-
- // Given three segments of 5 messages each
- for (_ <- 0 until 15) {
- log.appendAsLeader(createRecords, 0)
- }
-
- //Given epochs
- cache.assign(0, 0)
- cache.assign(1, 7)
- cache.assign(2, 10)
-
- //When first segment removed (up to offset 5)
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
-
- //The first entry should have gone from (0,0) => (0,5)
- assertEquals(util.List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new
EpochEntry(2, 10)), cache.epochEntries)
- }
-
- @Test
- def shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog(): Unit = {
- def createRecords(startOffset: Long, epoch: Int): MemoryRecords = {
- TestUtils.records(Seq(new SimpleRecord("value".getBytes)),
- baseOffset = startOffset, partitionLeaderEpoch = epoch)
- }
-
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 10 *
createRecords(0, 0).sizeInBytes)
- val log = createLog(logDir, logConfig)
- val cache = epochCache(log)
-
- def append(epoch: Int, startOffset: Long, count: Int): Unit = {
- for (i <- 0 until count)
- log.appendAsFollower(createRecords(startOffset + i, epoch), epoch)
- }
-
- //Given 2 segments, 10 messages per segment
- append(epoch = 0, startOffset = 0, count = 10)
- append(epoch = 1, startOffset = 10, count = 6)
- append(epoch = 2, startOffset = 16, count = 4)
-
- assertEquals(2, log.numberOfSegments)
- assertEquals(20, log.logEndOffset)
-
- //When truncate to LEO (no op)
- log.truncateTo(log.logEndOffset)
-
- //Then no change
- assertEquals(3, cache.epochEntries.size)
-
- //When truncate
- log.truncateTo(11)
-
- //Then no change
- assertEquals(2, cache.epochEntries.size)
-
- //When truncate
- log.truncateTo(10)
-
- //Then
- assertEquals(1, cache.epochEntries.size)
-
- //When truncate all
- log.truncateTo(0)
-
- //Then
- assertEquals(0, cache.epochEntries.size)
- }
-
- @Test
- def testFirstUnstableOffsetNoTransactionalData(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- val records = MemoryRecords.withRecords(Compression.NONE,
- new SimpleRecord("foo".getBytes),
- new SimpleRecord("bar".getBytes),
- new SimpleRecord("baz".getBytes))
-
- log.appendAsLeader(records, 0)
- assertEquals(Optional.empty, log.firstUnstableOffset)
- }
-
- @Test
- def testFirstUnstableOffsetWithTransactionalData(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- val pid = 137L
- val epoch = 5.toShort
- var seq = 0
-
- // add some transactional records
- val records = MemoryRecords.withTransactionalRecords(Compression.NONE,
pid, epoch, seq,
- new SimpleRecord("foo".getBytes),
- new SimpleRecord("bar".getBytes),
- new SimpleRecord("baz".getBytes))
-
- val firstAppendInfo = log.appendAsLeader(records, 0)
- assertEquals(Optional.of(firstAppendInfo.firstOffset),
log.firstUnstableOffset)
-
- // add more transactional records
- seq += 3
-
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
pid, epoch, seq,
- new SimpleRecord("blah".getBytes)), 0)
-
- // LSO should not have changed
- assertEquals(Optional.of(firstAppendInfo.firstOffset),
log.firstUnstableOffset)
-
- // now transaction is committed
- val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid,
epoch, ControlRecordType.COMMIT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel())
-
- // first unstable offset is not updated until the high watermark is
advanced
- assertEquals(Optional.of(firstAppendInfo.firstOffset),
log.firstUnstableOffset)
- log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
-
- // now there should be no first unstable offset
- assertEquals(Optional.empty, log.firstUnstableOffset)
- }
-
@Test
def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
@@ -3535,6 +3067,7 @@ class UnifiedLogTest {
}
}
+
@Test
def testTransactionIndexUpdated(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
@@ -3960,7 +3493,7 @@ class UnifiedLogTest {
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(8L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertEquals(1, log.logSegments.size)
// the first unstable offset should be lower bounded by the log start
offset
@@ -4160,7 +3693,7 @@ class UnifiedLogTest {
assertEquals(25L, initialHighWatermark)
val initialNumSegments = log.numberOfSegments
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertTrue(log.numberOfSegments < initialNumSegments)
assertTrue(log.logStartOffset <= initialHighWatermark)
}
@@ -4870,7 +4403,7 @@ class UnifiedLogTest {
mockTime.sleep(2)
// It should have rolled the active segment as they are eligible for
deletion
- log.deleteOldSegments()
+ assertEquals(0, log.deleteOldSegments())
assertEquals(2, log.logSegments.size)
log.logSegments.asScala.zipWithIndex.foreach {
case (segment, idx) => assertEquals(idx, segment.baseOffset)
@@ -4878,7 +4411,7 @@ class UnifiedLogTest {
// Once rolled, the segment should be uploaded to remote storage and
eligible for deletion
log.updateHighestOffsetInRemoteStorage(1)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertEquals(1, log.logSegments.size)
assertEquals(1, log.logSegments.asScala.head.baseOffset())
assertEquals(1, log.localLogStartOffset())
@@ -4910,14 +4443,14 @@ class UnifiedLogTest {
// No segments are uploaded to remote storage, none of the local log
segments should be eligible for deletion
log.updateHighestOffsetInRemoteStorage(-1L)
- log.deleteOldSegments()
+ assertEquals(0, log.deleteOldSegments())
mockTime.sleep(1)
assertEquals(2, log.logSegments.size)
assertFalse(log.isEmpty)
// Update the log-start-offset from 0 to 3, then the base segment should
not be eligible for deletion
log.updateLogStartOffsetFromRemoteTier(3L)
- log.deleteOldSegments()
+ assertEquals(0, log.deleteOldSegments())
mockTime.sleep(1)
assertEquals(2, log.logSegments.size)
assertFalse(log.isEmpty)
@@ -4925,13 +4458,13 @@ class UnifiedLogTest {
// Update the log-start-offset from 3 to 4, then the base segment should
be eligible for deletion now even
// if it is not uploaded to remote storage
log.updateLogStartOffsetFromRemoteTier(4L)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
mockTime.sleep(1)
assertEquals(1, log.logSegments.size)
assertFalse(log.isEmpty)
log.updateLogStartOffsetFromRemoteTier(5L)
- log.deleteOldSegments()
+ assertEquals(0, log.deleteOldSegments())
mockTime.sleep(1)
assertEquals(1, log.logSegments.size)
assertTrue(log.isEmpty)
@@ -4954,7 +4487,7 @@ class UnifiedLogTest {
log.updateHighWatermark(log.logEndOffset)
// simulate calls to upload 2 segments to remote storage
log.updateHighestOffsetInRemoteStorage(1)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertEquals(4, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
@@ -4979,7 +4512,7 @@ class UnifiedLogTest {
log.updateHighestOffsetInRemoteStorage(1)
mockTime.sleep(1001)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertEquals(4, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
@@ -5002,7 +4535,7 @@ class UnifiedLogTest {
log.updateHighWatermark(log.logEndOffset)
// Should not delete local log because highest remote storage offset is -1
(default value)
- log.deleteOldSegments()
+ assertEquals(0, log.deleteOldSegments())
assertEquals(6, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(0, log.localLogStartOffset())
@@ -5010,7 +4543,7 @@ class UnifiedLogTest {
// simulate calls to upload 2 segments to remote storage
log.updateHighestOffsetInRemoteStorage(1)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertEquals(4, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
@@ -5022,7 +4555,7 @@ class UnifiedLogTest {
// No local logs will be deleted even though local retention bytes is 1
because we'll adopt retention.ms/bytes
// when remote.log.copy.disable = true
- log.deleteOldSegments()
+ assertEquals(0, log.deleteOldSegments())
assertEquals(4, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
@@ -5042,7 +4575,7 @@ class UnifiedLogTest {
// try to delete local logs again, 2 segments will be deleted this time
because we'll adopt retention.ms/bytes (retention.bytes = 5)
// when remote.log.copy.disable = true
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertEquals(5, log.logSegments.size())
assertEquals(4, log.logStartOffset)
assertEquals(4, log.localLogStartOffset())
@@ -5054,14 +4587,14 @@ class UnifiedLogTest {
// Should not delete any logs because no local logs expired using
retention.ms = 1000
mockTime.sleep(10)
- log.deleteOldSegments()
+ assertEquals(0, log.deleteOldSegments())
assertEquals(5, log.logSegments.size())
assertEquals(4, log.logStartOffset)
assertEquals(4, log.localLogStartOffset())
// Should delete all logs because all of them are expired based on
retentionMs = 1000
mockTime.sleep(1000)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertEquals(1, log.logSegments.size())
assertEquals(9, log.logStartOffset)
assertEquals(9, log.localLogStartOffset())
@@ -5085,7 +4618,7 @@ class UnifiedLogTest {
// simulate calls to upload 3 segments to remote storage
log.updateHighestOffsetInRemoteStorage(30)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertEquals(2, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(31, log.localLogStartOffset())
@@ -5109,7 +4642,7 @@ class UnifiedLogTest {
// simulate calls to upload 3 segments to remote storage
log.updateHighestOffsetInRemoteStorage(30)
- log.deleteOldSegments()
+ assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
assertEquals(2, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(31, log.localLogStartOffset())
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index 50b666a0fb1..6a89b82043d 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -16,11 +16,18 @@
*/
package org.apache.kafka.storage.internals.log;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.RequestLocal;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
public class LogTestUtils {
public static LogSegment createSegment(long offset, File logDir, int
indexIntervalBytes, Time time) throws IOException {
@@ -33,4 +40,119 @@ public class LogTestUtils {
// Create and return the LogSegment instance
return new LogSegment(ms, idx, timeIdx, txnIndex, offset,
indexIntervalBytes, 0, time);
}
+
+
+ /**
+ * Append an end transaction marker (commit or abort) to the log as a
leader.
+ *
+ * @param transactionVersion the transaction version (1 = TV1, 2 = TV2)
etc. Must be explicitly specified.
+ * TV2 markers require strict epoch validation
(markerEpoch > currentEpoch),
+ * while legacy markers use relaxed validation
(markerEpoch >= currentEpoch).
+ */
+ public static LogAppendInfo appendEndTxnMarkerAsLeader(UnifiedLog log,
+ long producerId,
+ short producerEpoch,
+ ControlRecordType
controlType,
+ long timestamp,
+ int
coordinatorEpoch,
+ int leaderEpoch,
+ short
transactionVersion) {
+ MemoryRecords records = endTxnRecords(controlType, producerId,
producerEpoch, 0L, coordinatorEpoch, leaderEpoch, timestamp);
+
+ return log.appendAsLeader(records, leaderEpoch,
AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL,
transactionVersion);
+ }
+
+ public static MemoryRecords endTxnRecords(ControlRecordType
controlRecordType,
+ long producerId,
+ short epoch,
+ long offset,
+ int coordinatorEpoch,
+ int partitionLeaderEpoch,
+ long timestamp) {
+ EndTransactionMarker marker = new
EndTransactionMarker(controlRecordType, coordinatorEpoch);
+ return MemoryRecords.withEndTransactionMarker(offset, timestamp,
partitionLeaderEpoch, producerId, epoch, marker);
+ }
+
+ public static class LogConfigBuilder {
+ private final Map<String, Object> configs = new HashMap<>();
+
+ public LogConfigBuilder segmentMs(long segmentMs) {
+ configs.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs);
+ return this;
+ }
+
+ public LogConfigBuilder segmentBytes(int segmentBytes) {
+ configs.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes);
+ return this;
+ }
+
+ public LogConfigBuilder retentionMs(long retentionMs) {
+ configs.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs);
+ return this;
+ }
+
+ public LogConfigBuilder localRetentionMs(long localRetentionMs) {
+ configs.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG,
localRetentionMs);
+ return this;
+ }
+
+ public LogConfigBuilder retentionBytes(long retentionBytes) {
+ configs.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes);
+ return this;
+ }
+
+ public LogConfigBuilder localRetentionBytes(long localRetentionBytes) {
+ configs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localRetentionBytes);
+ return this;
+ }
+
+ public LogConfigBuilder segmentJitterMs(long segmentJitterMs) {
+ configs.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, segmentJitterMs);
+ return this;
+ }
+
+ public LogConfigBuilder cleanupPolicy(String cleanupPolicy) {
+ configs.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy);
+ return this;
+ }
+
+ public LogConfigBuilder maxMessageBytes(int maxMessageBytes) {
+ configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageBytes);
+ return this;
+ }
+
+ public LogConfigBuilder indexIntervalBytes(int indexIntervalBytes) {
+ configs.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG,
indexIntervalBytes);
+ return this;
+ }
+
+ public LogConfigBuilder segmentIndexBytes(int segmentIndexBytes) {
+ configs.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG,
segmentIndexBytes);
+ return this;
+ }
+
+ public LogConfigBuilder fileDeleteDelayMs(long fileDeleteDelayMs) {
+ configs.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG,
fileDeleteDelayMs);
+ return this;
+ }
+
+ public LogConfigBuilder remoteLogStorageEnable(boolean
remoteLogStorageEnable) {
+ configs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
remoteLogStorageEnable);
+ return this;
+ }
+
+ public LogConfigBuilder remoteLogCopyDisable(boolean
remoteLogCopyDisable) {
+ configs.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
remoteLogCopyDisable);
+ return this;
+ }
+
+ public LogConfigBuilder remoteLogDeleteOnDisable(boolean
remoteLogDeleteOnDisable) {
+ configs.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
remoteLogDeleteOnDisable);
+ return this;
+ }
+
+ public LogConfig build() {
+ return new LogConfig(configs);
+ }
+ }
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index ea14932ff20..0c7b4271d00 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -16,17 +16,63 @@
*/
package org.apache.kafka.storage.internals.log;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.common.TransactionVersion;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class UnifiedLogTest {
private final File tmpDir = TestUtils.tempDirectory();
+ private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+ private final BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(false);
+ private final MockTime mockTime = new MockTime();
+ private final int maxTransactionTimeoutMs = 60 * 60 * 1000;
+ private final ProducerStateManagerConfig producerStateManagerConfig = new
ProducerStateManagerConfig(maxTransactionTimeoutMs, false);
+ private final List<UnifiedLog> logsToClose = new ArrayList<>();
+
+ private UnifiedLog log;
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ brokerTopicStats.close();
+ for (UnifiedLog log : logsToClose) {
+ log.close();
+ }
+ Utils.delete(tmpDir);
+ }
@Test
public void testOffsetFromProducerSnapshotFile() {
@@ -34,4 +80,671 @@ public class UnifiedLogTest {
File snapshotFile = LogFileUtils.producerSnapshotFile(tmpDir, offset);
assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile));
}
+
+ @Test
+ public void shouldApplyEpochToMessageOnAppendIfLeader() throws IOException
{
+ SimpleRecord[] records = java.util.stream.IntStream.range(0, 50)
+ .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ // Given this partition is on leader epoch 72
+ int epoch = 72;
+ try (UnifiedLog log = createLog(logDir, new LogConfig(new
Properties()))) {
+ log.assignEpochStartOffset(epoch, records.length);
+
+ // When appending messages as a leader (i.e. assignOffsets = true)
+ for (SimpleRecord record : records) {
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
record), epoch);
+ }
+
+ // Then leader epoch should be set on messages
+ for (int i = 0; i < records.length; i++) {
+ FetchDataInfo read = log.read(i, 1, FetchIsolation.LOG_END,
true);
+ RecordBatch batch = read.records.batches().iterator().next();
+ assertEquals(epoch, batch.partitionLeaderEpoch(), "Should have
set leader epoch");
+ }
+ }
+ }
+
+ @Test
+ public void
followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache()
throws IOException {
+ int[] messageIds = java.util.stream.IntStream.range(0, 50).toArray();
+ SimpleRecord[] records = Arrays.stream(messageIds)
+ .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ //Given each message has an offset & epoch, as msgs from leader would
+ Function<Integer, MemoryRecords> recordsForEpoch = i -> {
+ MemoryRecords recs = MemoryRecords.withRecords(messageIds[i],
Compression.NONE, records[i]);
+ recs.batches().forEach(record -> {
+ record.setPartitionLeaderEpoch(42);
+ record.setLastOffset(i);
+ });
+ return recs;
+ };
+
+ try (UnifiedLog log = createLog(logDir, new LogConfig(new
Properties()))) {
+ // Given each message has an offset & epoch, as msgs from leader
would
+ for (int i = 0; i < records.length; i++) {
+ log.appendAsFollower(recordsForEpoch.apply(i), i);
+ }
+
+ assertEquals(Optional.of(42), log.latestEpoch());
+ }
+ }
+
+ @Test
+ public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .retentionBytes(records.get().sizeInBytes() * 10L)
+ .build();
+
+ log = createLog(logDir, config);
+ LeaderEpochFileCache cache = epochCache(log);
+
+ // Given three segments of 5 messages each
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ // Given epochs
+ cache.assign(0, 0);
+ cache.assign(1, 5);
+ cache.assign(2, 10);
+
+ // When first segment is removed
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+
+ //The oldest epoch entry should have been removed
+ assertEquals(List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)),
cache.epochEntries());
+ }
+
+ @Test
+ public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .retentionBytes(records.get().sizeInBytes() * 10L)
+ .build();
+
+ log = createLog(logDir, config);
+ LeaderEpochFileCache cache = epochCache(log);
+
+ // Given three segments of 5 messages each
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ // Given epochs
+ cache.assign(0, 0);
+ cache.assign(1, 7);
+ cache.assign(2, 10);
+
+ // When first segment removed (up to offset 5)
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+
+ //The first entry should have gone from (0,0) => (0,5)
+ assertEquals(List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new
EpochEntry(2, 10)), cache.epochEntries());
+ }
+
+ @Test
+ public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog()
throws IOException {
+ Supplier<MemoryRecords> records = () -> records(List.of(new
SimpleRecord("value".getBytes())), 0, 0);
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(10 * records.get().sizeInBytes())
+ .build();
+ log = createLog(logDir, config);
+ LeaderEpochFileCache cache = epochCache(log);
+
+ //Given 2 segments, 10 messages per segment
+ append(0, 0, 10);
+ append(1, 10, 6);
+ append(2, 16, 4);
+
+ assertEquals(2, log.numberOfSegments());
+ assertEquals(20, log.logEndOffset());
+
+ // When truncate to LEO (no op)
+ log.truncateTo(log.logEndOffset());
+ // Then no change
+ assertEquals(3, cache.epochEntries().size());
+
+ // When truncate
+ log.truncateTo(11);
+ // Then no change
+ assertEquals(2, cache.epochEntries().size());
+
+ // When truncate
+ log.truncateTo(10);
+ assertEquals(1, cache.epochEntries().size());
+
+ // When truncate all
+ log.truncateTo(0);
+ assertEquals(0, cache.epochEntries().size());
+ }
+
+ @Test
+ public void shouldDeleteSizeBasedSegments() throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .retentionBytes(records.get().sizeInBytes() * 10L)
+ .build();
+ log = createLog(logDir, config);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(2, log.numberOfSegments(), "should have 2 segments");
+ }
+
+ @Test
+ public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize()
throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .retentionBytes(records.get().sizeInBytes() * 15L)
+ .build();
+
+ log = createLog(logDir, config);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(0, log.deleteOldSegments());
+ assertEquals(3, log.numberOfSegments(), "should have 3 segments");
+ }
+
+ @Test
+ public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), 10L);
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 15)
+ .retentionMs(10000L)
+ .build();
+ log = createLog(logDir, config);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(1, log.numberOfSegments(), "There should be 1 segment
remaining");
+ }
+
+ @Test
+ public void shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted()
throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), mockTime.milliseconds());
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .retentionMs(10000000)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(0, log.deleteOldSegments());
+ assertEquals(3, log.numberOfSegments(), "There should be 3 segments
remaining");
+ }
+
+ @Test
+ public void shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .retentionMs(10000)
+ .cleanupPolicy("compact")
+ .build();
+ log = createLog(logDir, config);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ // mark the oldest segment as older the retention.ms
+
log.logSegments().iterator().next().setLastModified(mockTime.milliseconds() -
20000);
+
+ int segments = log.numberOfSegments();
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(0, log.deleteOldSegments());
+ assertEquals(segments, log.numberOfSegments(), "There should be 3
segments remaining");
+ }
+
+ @Test
+ public void
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete()
throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .retentionBytes(records.get().sizeInBytes() * 10L)
+ .cleanupPolicy("compact, delete")
+ .build();
+
+ log = createLog(logDir, config);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(1, log.numberOfSegments(), "There should be 1 segment
remaining");
+ }
+
+ @Test
+ public void
shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+ int recordSize = records.get().sizeInBytes();
+ LogConfig config = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(recordSize * 2)
+ .retentionBytes(recordSize / 2)
+ .cleanupPolicy("")
+ .remoteLogStorageEnable(true)
+ .build();
+ log = createLog(logDir, config, true);
+
+ for (int i = 0; i < 10; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ int segmentsBefore = log.numberOfSegments();
+ log.updateHighWatermark(log.logEndOffset());
+ log.updateHighestOffsetInRemoteStorage(log.logEndOffset() - 1);
+ int deletedSegments = log.deleteOldSegments();
+
+ assertTrue(log.numberOfSegments() < segmentsBefore, "Some segments
should be deleted due to size retention");
+ assertTrue(deletedSegments > 0, "At least one segment should be
deleted");
+ }
+
+ @Test
+ public void shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention()
throws IOException {
+ long oldTimestamp = mockTime.milliseconds() - 20000;
+ Supplier<MemoryRecords> oldRecords = () ->
singletonRecords("test".getBytes(), "test".getBytes(), oldTimestamp);
+ int recordSize = oldRecords.get().sizeInBytes();
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(recordSize * 2)
+ .localRetentionMs(5000)
+ .cleanupPolicy("")
+ .remoteLogStorageEnable(true)
+ .build();
+ log = createLog(logDir, logConfig, true);
+
+ for (int i = 0; i < 10; i++) {
+ log.appendAsLeader(oldRecords.get(), 0);
+ }
+
+ Supplier<MemoryRecords> newRecords = () ->
singletonRecords("test".getBytes(), "test".getBytes(), mockTime.milliseconds());
+ for (int i = 0; i < 5; i++) {
+ log.appendAsLeader(newRecords.get(), 0);
+ }
+
+ int segmentsBefore = log.numberOfSegments();
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.updateHighestOffsetInRemoteStorage(log.logEndOffset() - 1);
+ int deletedSegments = log.deleteOldSegments();
+
+ assertTrue(log.numberOfSegments() < segmentsBefore, "Some segments
should be deleted due to time retention");
+ assertTrue(deletedSegments > 0, "At least one segment should be
deleted");
+ }
+
+ @Test
+ public void testLogDeletionAfterDeleteRecords() throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+ assertEquals(3, log.numberOfSegments());
+ assertEquals(0, log.logStartOffset());
+ log.updateHighWatermark(log.logEndOffset());
+
+ // The logStartOffset at the first segment so we did not delete it.
+ log.maybeIncrementLogStartOffset(1,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ assertEquals(0, log.deleteOldSegments());
+ assertEquals(3, log.numberOfSegments());
+ assertEquals(1, log.logStartOffset());
+
+ log.maybeIncrementLogStartOffset(6,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(2, log.numberOfSegments());
+ assertEquals(6, log.logStartOffset());
+
+ log.maybeIncrementLogStartOffset(15,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(1, log.numberOfSegments());
+ assertEquals(15, log.logStartOffset());
+ }
+
+ @Test
+ public void testLogDeletionAfterClose() throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000);
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .segmentIndexBytes(1000)
+ .retentionMs(999)
+ .build();
+ log = createLog(logDir, logConfig);
+ // avoid close after test because it is closed in this test
+ logsToClose.remove(log);
+
+ // append some messages to create some segments
+ log.appendAsLeader(records.get(), 0);
+
+ assertEquals(1, log.numberOfSegments(), "The deleted segments should
be gone.");
+ assertEquals(1, epochCache(log).epochEntries().size(), "Epoch entries
should have gone.");
+
+ log.close();
+ log.delete();
+ assertEquals(0, log.numberOfSegments());
+ assertEquals(0, epochCache(log).epochEntries().size(), "Epoch entries
should have gone.");
+ }
+
+ @Test
+ public void testDeleteOldSegments() throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000);
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .segmentIndexBytes(1000)
+ .retentionMs(999)
+ .build();
+ log = createLog(logDir, logConfig);
+ // avoid close after test because it is closed in this test
+ logsToClose.remove(log);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 100; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.assignEpochStartOffset(0, 40);
+ log.assignEpochStartOffset(1, 90);
+
+ // segments are not eligible for deletion if no high watermark has
been set
+ int numSegments = log.numberOfSegments();
+ assertEquals(0, log.deleteOldSegments());
+ assertEquals(numSegments, log.numberOfSegments());
+ assertEquals(0L, log.logStartOffset());
+
+ // only segments with offset before the current high watermark are
eligible for deletion
+ for (long hw = 25; hw <= 30; hw++) {
+ log.updateHighWatermark(hw);
+ log.deleteOldSegments();
+ assertTrue(log.logStartOffset() <= hw);
+ long finalHw = hw;
+ log.logSegments().forEach(segment -> {
+ FetchDataInfo segmentFetchInfo;
+ try {
+ segmentFetchInfo = segment.read(segment.baseOffset(),
Integer.MAX_VALUE);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Optional<RecordBatch> lastBatch = Optional.empty();
+ for (RecordBatch batch : segmentFetchInfo.records.batches()) {
+ lastBatch = Optional.of(batch);
+ }
+ lastBatch.ifPresent(batch -> assertTrue(batch.lastOffset() >=
finalHw));
+ });
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(1, log.numberOfSegments(), "The deleted segments should
be gone.");
+ assertEquals(1, epochCache(log).epochEntries().size(), "Epoch entries
should have gone.");
+ assertEquals(new EpochEntry(1, 100),
epochCache(log).epochEntries().get(0), "Epoch entry should be the latest epoch
and the leo.");
+
+ for (int i = 0; i < 100; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ log.delete();
+ assertEquals(0, log.numberOfSegments(), "The number of segments should
be 0");
+ assertEquals(0, log.deleteOldSegments(), "The number of deleted
segments should be zero.");
+ assertEquals(0, epochCache(log).epochEntries().size(), "Epoch entries
should have gone.");
+ }
+
+ @Test
+ public void
shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+ int recordsPerSegment = 5;
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * recordsPerSegment)
+ .segmentIndexBytes(1000)
+ .cleanupPolicy("compact")
+ .build();
+ log = createLog(logDir, logConfig);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ assertEquals(3, log.numberOfSegments());
+ log.updateHighWatermark(log.logEndOffset());
+ log.maybeIncrementLogStartOffset(recordsPerSegment,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+
+ // The first segment, which is entirely before the log start offset,
should be deleted
+ // Of the remaining the segments, the first can overlap the log start
offset and the rest must have a base offset
+ // greater than the start offset.
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(2, log.numberOfSegments(), "There should be 2 segments
remaining");
+ assertTrue(log.logSegments().iterator().next().baseOffset() <=
log.logStartOffset());
+ log.logSegments().forEach(segment -> {
+ if (log.logSegments().iterator().next() != segment) {
+ assertTrue(segment.baseOffset() > log.logStartOffset());
+ }
+ });
+ }
+
+ @Test
+ public void testFirstUnstableOffsetNoTransactionalData() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1024 * 1024 * 5)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord("foo".getBytes()),
+ new SimpleRecord("bar".getBytes()),
+ new SimpleRecord("baz".getBytes()));
+
+ log.appendAsLeader(records, 0);
+ assertEquals(Optional.empty(), log.firstUnstableOffset());
+ }
+
+ @Test
+ public void testFirstUnstableOffsetWithTransactionalData() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1024 * 1024 * 5)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ long pid = 137L;
+ short epoch = 5;
+ int seq = 0;
+
+ // add some transactional records
+ MemoryRecords records = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, pid, epoch, seq,
+ new SimpleRecord("foo".getBytes()),
+ new SimpleRecord("bar".getBytes()),
+ new SimpleRecord("baz".getBytes()));
+
+ LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+ assertEquals(Optional.of(firstAppendInfo.firstOffset()),
log.firstUnstableOffset());
+
+ // add more transactional records
+ seq += 3;
+
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
pid, epoch, seq,
+ new SimpleRecord("blah".getBytes())), 0);
+ assertEquals(Optional.of(firstAppendInfo.firstOffset()),
log.firstUnstableOffset());
+
+ // now transaction is committed
+ LogAppendInfo commitAppendInfo =
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
+ ControlRecordType.COMMIT, mockTime.milliseconds(), 0, 0,
TransactionVersion.TV_0.featureLevel());
+
+ // first unstable offset is not updated until the high watermark is
advanced
+ assertEquals(Optional.of(firstAppendInfo.firstOffset()),
log.firstUnstableOffset());
+ log.updateHighWatermark(commitAppendInfo.lastOffset() + 1);
+
+ // now there should be no first unstable offset
+ assertEquals(Optional.empty(), log.firstUnstableOffset());
+ }
+
+ private void append(int epoch, long startOffset, int count) {
+ Function<Integer, MemoryRecords> records = i ->
+ records(List.of(new SimpleRecord("value".getBytes())),
startOffset + i, epoch);
+ for (int i = 0; i < count; i++) {
+ log.appendAsFollower(records.apply(i), epoch);
+ }
+ }
+
+ private LeaderEpochFileCache epochCache(UnifiedLog log) {
+ return log.leaderEpochCache();
+ }
+
+ private UnifiedLog createLog(File dir, LogConfig config) throws
IOException {
+ return createLog(dir, config, false);
+ }
+
+ private UnifiedLog createLog(File dir, LogConfig config, boolean
remoteStorageSystemEnable) throws IOException {
+ return createLog(dir, config, this.brokerTopicStats,
mockTime.scheduler, this.mockTime,
+ this.producerStateManagerConfig, Optional.empty(),
remoteStorageSystemEnable);
+ }
+
+ private UnifiedLog createLog(
+ File dir,
+ LogConfig config,
+ BrokerTopicStats brokerTopicStats,
+ Scheduler scheduler,
+ MockTime time,
+ ProducerStateManagerConfig producerStateManagerConfig,
+ Optional<Uuid> topicId,
+ boolean remoteStorageSystemEnable) throws IOException {
+
+ UnifiedLog log = UnifiedLog.create(
+ dir,
+ config,
+ 0L,
+ 0L,
+ scheduler,
+ brokerTopicStats,
+ time,
+ 3600000,
+ producerStateManagerConfig,
+
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ new LogDirFailureChannel(10),
+ true,
+ topicId,
+ new ConcurrentHashMap<>(),
+ remoteStorageSystemEnable,
+ LogOffsetsListener.NO_OP_OFFSETS_LISTENER
+ );
+
+ this.logsToClose.add(log);
+ return log;
+ }
+
+ public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+ return singletonRecords(value, key, Compression.NONE,
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+ }
+
+ public static MemoryRecords singletonRecords(byte[] value, long timestamp)
{
+ return singletonRecords(value, null, Compression.NONE, timestamp,
RecordBatch.CURRENT_MAGIC_VALUE);
+ }
+
+ public static MemoryRecords singletonRecords(
+ byte[] value
+ ) {
+ return records(List.of(new SimpleRecord(RecordBatch.NO_TIMESTAMP,
null, value)),
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ Compression.NONE,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ 0,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH
+ );
+ }
+
+ public static MemoryRecords singletonRecords(
+ byte[] value,
+ byte[] key,
+ Compression codec,
+ long timestamp,
+ byte magicValue
+ ) {
+ return records(List.of(new SimpleRecord(timestamp, key, value)),
+ magicValue, codec,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ 0,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH
+ );
+ }
+
+ public static MemoryRecords singletonRecords(byte[] value, byte[] key,
long timestamp) {
+ return singletonRecords(value, key, Compression.NONE, timestamp,
RecordBatch.CURRENT_MAGIC_VALUE);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, long
baseOffset) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, long
baseOffset, int partitionLeaderEpoch) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
baseOffset, partitionLeaderEpoch);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, byte
magicValue, Compression compression) {
+ return records(records, magicValue, compression,
RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records,
+ byte magicValue,
+ Compression compression,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset,
+ int partitionLeaderEpoch) {
+ ByteBuffer buf =
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue,
compression, TimestampType.CREATE_TIME, baseOffset,
+ System.currentTimeMillis(), producerId, producerEpoch,
sequence, false, partitionLeaderEpoch);
+ for (SimpleRecord record : records) {
+ builder.append(record);
+ }
+ return builder.build();
+ }
}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
index 4c75272edd4..a9e49dd0492 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
@@ -35,6 +35,7 @@ import static java.lang.String.format;
* Helper functions for writing unit tests.
* <p>
* <b>Package-private:</b> Not intended for use outside {@code
org.apache.kafka.common.test}.
+ * Use {@code org/apache/kafka/test/TestUtils} instead.
*/
class TestUtils {
private static final Logger log = LoggerFactory.getLogger(TestUtils.class);