This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b706eb4 MINOR: Eliminate redundant functions in LogTest suite (#10732)
b706eb4 is described below
commit b706eb4a9549f9d8448ff5a889c02e48ee07d71d
Author: Kowshik Prakasam <[email protected]>
AuthorDate: Thu May 20 08:46:34 2021 -0700
MINOR: Eliminate redundant functions in LogTest suite (#10732)
Reviewers: Satish Duggana <[email protected]>, Jun Rao <[email protected]>
---
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 104 +++---
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 12 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 348 +++++++--------------
.../test/scala/unit/kafka/log/LogTestUtils.scala | 14 +-
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 6 +-
5 files changed, 178 insertions(+), 306 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 801ba26..99ff1aa 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -72,7 +72,7 @@ class LogCleanerTest {
// append messages to the log until we have four segments
while(log.numberOfSegments < 4)
log.appendAsLeader(record(log.logEndOffset.toInt,
log.logEndOffset.toInt), leaderEpoch = 0)
- val keysFound = LogTest.keysInLog(log)
+ val keysFound = LogTestUtils.keysInLog(log)
assertEquals(0L until log.logEndOffset, keysFound)
// pretend we have the following keys
@@ -85,8 +85,8 @@ class LogCleanerTest {
val stats = new CleanerStats()
val expectedBytesRead = segments.map(_.size).sum
cleaner.cleanSegments(log, segments, map, 0L, stats, new
CleanedTransactionMetadata)
- val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_))
- assertEquals(shouldRemain, LogTest.keysInLog(log))
+ val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
+ assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
assertEquals(expectedBytesRead, stats.bytesRead)
}
@@ -230,7 +230,7 @@ class LogCleanerTest {
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L,
log.activeSegment.baseOffset))
assertEquals(List(2, 5, 7), lastOffsetsPerBatchInLog(log))
assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1), lastSequencesInLog(log))
- assertEquals(List(2, 3, 1, 4), LogTest.keysInLog(log))
+ assertEquals(List(2, 3, 1, 4), LogTestUtils.keysInLog(log))
assertEquals(List(1, 3, 6, 7), offsetsInLog(log))
// we have to reload the log to validate that the cleaner maintained
sequence numbers correctly
@@ -262,7 +262,7 @@ class LogCleanerTest {
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L,
log.activeSegment.baseOffset))
assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1, pid4 -> 0),
lastSequencesInLog(log))
assertEquals(List(2, 5, 7, 8), lastOffsetsPerBatchInLog(log))
- assertEquals(List(3, 1, 4, 2), LogTest.keysInLog(log))
+ assertEquals(List(3, 1, 4, 2), LogTestUtils.keysInLog(log))
assertEquals(List(3, 6, 7, 8), offsetsInLog(log))
reloadLog()
@@ -299,7 +299,7 @@ class LogCleanerTest {
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L,
log.activeSegment.baseOffset))
- assertEquals(List(3, 2), LogTest.keysInLog(log))
+ assertEquals(List(3, 2), LogTestUtils.keysInLog(log))
assertEquals(List(3, 6, 7, 8, 9), offsetsInLog(log))
// ensure the transaction index is still correct
@@ -339,7 +339,7 @@ class LogCleanerTest {
// we have only cleaned the records in the first segment
val dirtyOffset = cleaner.clean(LogToClean(new TopicPartition("test", 0),
log, 0L, log.activeSegment.baseOffset))._1
- assertEquals(List(2, 3, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10),
LogTest.keysInLog(log))
+ assertEquals(List(2, 3, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10),
LogTestUtils.keysInLog(log))
log.roll()
@@ -349,7 +349,7 @@ class LogCleanerTest {
// finally only the keys from pid3 should remain
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, dirtyOffset,
log.activeSegment.baseOffset))
- assertEquals(List(2, 3, 6, 7, 8, 9, 11, 12), LogTest.keysInLog(log))
+ assertEquals(List(2, 3, 6, 7, 8, 9, 11, 12), LogTestUtils.keysInLog(log))
}
@Test
@@ -373,7 +373,7 @@ class LogCleanerTest {
// cannot remove the marker in this pass because there are still valid
records
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
- assertEquals(List(1, 3, 2), LogTest.keysInLog(log))
+ assertEquals(List(1, 3, 2), LogTestUtils.keysInLog(log))
assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
appendProducer(Seq(1, 3))
@@ -382,17 +382,17 @@ class LogCleanerTest {
// the first cleaning preserves the commit marker (at offset 3) since
there were still records for the transaction
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
- assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
+ assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
// delete horizon forced to 0 to verify marker is not removed early
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
- assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
+ assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
// clean again with large delete horizon and verify the marker is removed
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
- assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
+ assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
}
@@ -421,11 +421,11 @@ class LogCleanerTest {
log.roll()
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)
- assertEquals(List(2), LogTest.keysInLog(log))
+ assertEquals(List(2), LogTestUtils.keysInLog(log))
assertEquals(List(1, 3, 4), offsetsInLog(log))
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset),
deleteHorizonMs = Long.MaxValue)
- assertEquals(List(2), LogTest.keysInLog(log))
+ assertEquals(List(2), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4), offsetsInLog(log))
}
@@ -462,14 +462,14 @@ class LogCleanerTest {
// first time through the records are removed
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch},
{Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
- assertEquals(List(2, 3), LogTest.keysInLog(log))
+ assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
// the empty batch remains if cleaned again because it still holds the
last sequence
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch},
{Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
- assertEquals(List(2, 3), LogTest.keysInLog(log))
+ assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
@@ -483,13 +483,13 @@ class LogCleanerTest {
// Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2},
{3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
- assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
+ assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
// Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1:
Commit}, {Producer2: 1}, {Producer2: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
- assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
+ assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
}
@@ -514,14 +514,14 @@ class LogCleanerTest {
// first time through the control batch is retained as an empty batch
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
- assertEquals(List(2, 3), LogTest.keysInLog(log))
+ assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
// the empty control batch does not cause an exception when cleaned
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
- assertEquals(List(2, 3), LogTest.keysInLog(log))
+ assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
}
@@ -600,12 +600,12 @@ class LogCleanerTest {
// delete horizon set to 0 to verify marker is not removed early
val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L,
log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
- assertEquals(List(3), LogTest.keysInLog(log))
+ assertEquals(List(3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5), offsetsInLog(log))
// clean again with large delete horizon and verify the marker is removed
cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
- assertEquals(List(3), LogTest.keysInLog(log))
+ assertEquals(List(3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5), offsetsInLog(log))
}
@@ -679,14 +679,14 @@ class LogCleanerTest {
// first time through the records are removed
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
assertAbortedTransactionIndexed()
- assertEquals(List(), LogTest.keysInLog(log))
+ assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is retained
assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is
retained
// the empty batch remains if cleaned again because it still holds the
last sequence
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
assertAbortedTransactionIndexed()
- assertEquals(List(), LogTest.keysInLog(log))
+ assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained
assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is
retained
@@ -696,12 +696,12 @@ class LogCleanerTest {
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
assertAbortedTransactionIndexed()
- assertEquals(List(1), LogTest.keysInLog(log))
+ assertEquals(List(1), LogTestUtils.keysInLog(log))
assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet
gone because we read the empty batch
assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not
preserve the empty batch
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset,
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
- assertEquals(List(1), LogTest.keysInLog(log))
+ assertEquals(List(1), LogTestUtils.keysInLog(log))
assertEquals(List(3), offsetsInLog(log)) // abort marker is gone
assertEquals(List(3), lastOffsetsPerBatchInLog(log))
@@ -725,7 +725,7 @@ class LogCleanerTest {
while(log.numberOfSegments < 2)
log.appendAsLeader(record(log.logEndOffset.toInt,
Array.fill(largeMessageSize)(0: Byte)), leaderEpoch = 0)
- val keysFound = LogTest.keysInLog(log)
+ val keysFound = LogTestUtils.keysInLog(log)
assertEquals(0L until log.logEndOffset, keysFound)
// pretend we have the following keys
@@ -736,8 +736,8 @@ class LogCleanerTest {
// clean the log
val stats = new CleanerStats()
cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new
CleanedTransactionMetadata)
- val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_))
- assertEquals(shouldRemain, LogTest.keysInLog(log))
+ val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
+ assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
/**
@@ -749,8 +749,8 @@ class LogCleanerTest {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new
CleanerStats, new CleanedTransactionMetadata)
- val shouldRemain = LogTest.keysInLog(log).filter(k =>
!offsetMap.map.containsKey(k.toString))
- assertEquals(shouldRemain, LogTest.keysInLog(log))
+ val shouldRemain = LogTestUtils.keysInLog(log).filter(k =>
!offsetMap.map.containsKey(k.toString))
+ assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
/**
@@ -797,7 +797,7 @@ class LogCleanerTest {
while(log.numberOfSegments < 2)
log.appendAsLeader(record(log.logEndOffset.toInt,
Array.fill(largeMessageSize)(0: Byte)), leaderEpoch = 0)
- val keysFound = LogTest.keysInLog(log)
+ val keysFound = LogTestUtils.keysInLog(log)
assertEquals(0L until log.logEndOffset, keysFound)
// Decrease the log's max message size
@@ -834,7 +834,7 @@ class LogCleanerTest {
log.appendAsLeader(record(log.logEndOffset.toInt,
log.logEndOffset.toInt), leaderEpoch = 0)
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0,
log.activeSegment.baseOffset))
- val keys = LogTest.keysInLog(log).toSet
+ val keys = LogTestUtils.keysInLog(log).toSet
assertTrue((0 until leo.toInt by 2).forall(!keys.contains(_)), "None of
the keys we deleted should still exist.")
}
@@ -886,7 +886,7 @@ class LogCleanerTest {
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L,
log.activeSegment.baseOffset))
assertEquals(List(1, 3, 4), lastOffsetsPerBatchInLog(log))
assertEquals(Map(1L -> 0, 2L -> 1, 3L -> 0), lastSequencesInLog(log))
- assertEquals(List(0, 1), LogTest.keysInLog(log))
+ assertEquals(List(0, 1), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4), offsetsInLog(log))
}
@@ -909,7 +909,7 @@ class LogCleanerTest {
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L,
log.activeSegment.baseOffset))
assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log))
assertEquals(Map(producerId -> 2), lastSequencesInLog(log))
- assertEquals(List(), LogTest.keysInLog(log))
+ assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(3), offsetsInLog(log))
// Append a new entry from the producer and verify that the empty batch is
cleaned up
@@ -919,7 +919,7 @@ class LogCleanerTest {
assertEquals(List(3, 5), lastOffsetsPerBatchInLog(log))
assertEquals(Map(producerId -> 4), lastSequencesInLog(log))
- assertEquals(List(1, 5), LogTest.keysInLog(log))
+ assertEquals(List(1, 5), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5), offsetsInLog(log))
}
@@ -942,16 +942,16 @@ class LogCleanerTest {
// clean the log with only one message removed
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2,
log.activeSegment.baseOffset))
- assertEquals(List(1,0,1,0), LogTest.keysInLog(log))
+ assertEquals(List(1,0,1,0), LogTestUtils.keysInLog(log))
assertEquals(List(1,2,3,4), offsetsInLog(log))
// continue to make progress, even though we can only clean one message at
a time
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 3,
log.activeSegment.baseOffset))
- assertEquals(List(0,1,0), LogTest.keysInLog(log))
+ assertEquals(List(0,1,0), LogTestUtils.keysInLog(log))
assertEquals(List(2,3,4), offsetsInLog(log))
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 4,
log.activeSegment.baseOffset))
- assertEquals(List(1,0), LogTest.keysInLog(log))
+ assertEquals(List(1,0), LogTestUtils.keysInLog(log))
assertEquals(List(3,4), offsetsInLog(log))
}
@@ -1114,7 +1114,7 @@ class LogCleanerTest {
while(log.numberOfSegments < 4)
log.appendAsLeader(record(log.logEndOffset.toInt,
log.logEndOffset.toInt), leaderEpoch = 0)
- val keys = LogTest.keysInLog(log)
+ val keys = LogTestUtils.keysInLog(log)
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
assertThrows(classOf[LogCleaningAbortedException], () =>
@@ -1312,15 +1312,15 @@ class LogCleanerTest {
logProps.put(LogConfig.FileDeleteDelayMsProp, 1000: java.lang.Integer)
val config = LogConfig.fromProps(logConfig.originals, logProps)
- LogTest.initializeLogDirWithOverflowedSegment(dir)
+ LogTestUtils.initializeLogDirWithOverflowedSegment(dir)
val log = makeLog(config = config, recoveryPoint = Long.MaxValue)
- val segmentWithOverflow = LogTest.firstOverflowSegment(log).getOrElse {
+ val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse
{
throw new AssertionError("Failed to create log with a segment which has
overflowed offsets")
}
val numSegmentsInitial = log.logSegments.size
- val allKeys = LogTest.keysInLog(log).toList
+ val allKeys = LogTestUtils.keysInLog(log).toList
val expectedKeysAfterCleaning = new mutable.ArrayBuffer[Long]()
// pretend we want to clean every alternate key
@@ -1336,15 +1336,15 @@ class LogCleanerTest {
new CleanedTransactionMetadata)
)
assertEquals(numSegmentsInitial + 1, log.logSegments.size)
- assertEquals(allKeys, LogTest.keysInLog(log))
- assertFalse(LogTest.hasOffsetOverflow(log))
+ assertEquals(allKeys, LogTestUtils.keysInLog(log))
+ assertFalse(LogTestUtils.hasOffsetOverflow(log))
// Clean each segment now that split is complete.
for (segmentToClean <- log.logSegments)
cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new
CleanerStats(),
new CleanedTransactionMetadata)
- assertEquals(expectedKeysAfterCleaning, LogTest.keysInLog(log))
- assertFalse(LogTest.hasOffsetOverflow(log))
+ assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
+ assertFalse(LogTestUtils.hasOffsetOverflow(log))
log.close()
}
@@ -1374,7 +1374,7 @@ class LogCleanerTest {
log.appendAsLeader(record(log.logEndOffset.toInt,
log.logEndOffset.toInt), leaderEpoch = 0)
messageCount += 1
}
- val allKeys = LogTest.keysInLog(log)
+ val allKeys = LogTestUtils.keysInLog(log)
// pretend we have odd-numbered keys
val offsetMap = new FakeOffsetMap(Int.MaxValue)
@@ -1386,7 +1386,7 @@ class LogCleanerTest {
new CleanedTransactionMetadata)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
- var cleanedKeys = LogTest.keysInLog(log)
+ var cleanedKeys = LogTestUtils.keysInLog(log)
log.close()
// 1) Simulate recovery just after .cleaned file is created, before rename
to .swap
@@ -1402,7 +1402,7 @@ class LogCleanerTest {
new CleanedTransactionMetadata)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
- cleanedKeys = LogTest.keysInLog(log)
+ cleanedKeys = LogTestUtils.keysInLog(log)
log.close()
// 2) Simulate recovery just after swap file is created, before old
segment files are
@@ -1424,7 +1424,7 @@ class LogCleanerTest {
new CleanedTransactionMetadata)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
- cleanedKeys = LogTest.keysInLog(log)
+ cleanedKeys = LogTestUtils.keysInLog(log)
// 3) Simulate recovery after swap file is created and old segments files
are renamed
// to .deleted. Clean operation is resumed during recovery.
@@ -1442,7 +1442,7 @@ class LogCleanerTest {
new CleanedTransactionMetadata)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
- cleanedKeys = LogTest.keysInLog(log)
+ cleanedKeys = LogTestUtils.keysInLog(log)
log.close()
// 4) Simulate recovery after swap is complete, but async deletion
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index fd75779..0546db4 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -1341,7 +1341,7 @@ class LogLoaderTest {
@Test
def testFullTransactionIndexRecovery(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 128 * 5)
val log = createLog(logDir, logConfig)
val epoch = 0.toShort
@@ -1384,7 +1384,7 @@ class LogLoaderTest {
log.close()
- val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
+ val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 *
5)
val reloadedLog = createLog(logDir, reloadedLogConfig, lastShutdownClean =
false)
val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2,
8L, 74L, 36L)), abortedTransactions)
@@ -1392,7 +1392,7 @@ class LogLoaderTest {
@Test
def testRecoverOnlyLastSegment(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 128 * 5)
val log = createLog(logDir, logConfig)
val epoch = 0.toShort
@@ -1435,7 +1435,7 @@ class LogLoaderTest {
log.close()
- val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
+ val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 *
5)
val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint =
recoveryPoint, lastShutdownClean = false)
val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2,
8L, 74L, 36L)), abortedTransactions)
@@ -1443,7 +1443,7 @@ class LogLoaderTest {
@Test
def testRecoverLastSegmentWithNoSnapshots(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 128 * 5)
val log = createLog(logDir, logConfig)
val epoch = 0.toShort
@@ -1489,7 +1489,7 @@ class LogLoaderTest {
log.close()
- val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
+ val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 *
5)
val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint =
recoveryPoint, lastShutdownClean = false)
val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2,
8L, 74L, 36L)), abortedTransactions)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c8891eb..3054116 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,13 +22,13 @@ import java.nio.ByteBuffer
import java.nio.file.Files
import java.util.concurrent.{Callable, Executors}
import java.util.regex.Pattern
-import java.util.{Collections, Optional, Properties}
+import java.util.{Collections, Optional}
import kafka.common.{OffsetsOutOfOrderException, RecordValidationException,
UnexpectedAppendOffsetException}
import kafka.log.Log.DeleteDirSuffix
import kafka.metrics.KafkaYammerMetrics
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation,
FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel,
LogOffsetMetadata, PartitionMetadataFile}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation,
FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogOffsetMetadata,
PartitionMetadataFile}
import kafka.utils._
import org.apache.kafka.common.{InvalidRecordException, KafkaException,
TopicPartition, Uuid}
import org.apache.kafka.common.errors._
@@ -43,7 +43,7 @@ import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import scala.collection.{Iterable, Map}
+import scala.collection.Map
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
@@ -76,7 +76,7 @@ class LogTest {
@Test
def testHighWatermarkMetadataUpdatedAfterSegmentRoll(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
def assertFetchSizeAndOffsets(fetchOffset: Long,
@@ -112,7 +112,7 @@ class LogTest {
@Test
def testAppendAsLeaderWithRaftLeader(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
val leaderEpoch = 0
@@ -137,7 +137,7 @@ class LogTest {
@Test
def testAppendInfoFirstOffset(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
val simpleRecords = List(
@@ -179,7 +179,7 @@ class LogTest {
// resets the producer state. Specifically we are testing the case when
// the segment position of the first unstable offset is unknown.
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
val producerId = 17L
@@ -217,7 +217,7 @@ class LogTest {
@Test
def testHighWatermarkMaintenance(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
val leaderEpoch = 0
@@ -303,7 +303,7 @@ class LogTest {
@Test
def testFetchUpToLogEndOffset(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(
@@ -323,7 +323,7 @@ class LogTest {
@Test
def testFetchUpToHighWatermark(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(
@@ -357,7 +357,7 @@ class LogTest {
@Test
def testActiveProducers(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
def assertProducerState(
@@ -415,7 +415,7 @@ class LogTest {
@Test
def testFetchUpToLastStableOffset(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
val epoch = 0.toShort
@@ -501,7 +501,7 @@ class LogTest {
@Test
def testTimeBasedLogRoll(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L)
+ val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
// create a log
val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60)
@@ -548,7 +548,7 @@ class LogTest {
@Test
def testRollSegmentThatAlreadyExists(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L)
+ val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
// create a log
val log = createLog(logDir, logConfig)
@@ -674,7 +674,7 @@ class LogTest {
val msgPerSeg = 10
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
// create a log
- val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
@@ -701,7 +701,7 @@ class LogTest {
def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
// simulate the upgrade path by creating a new log with several segments,
deleting the
// snapshot files, and then reloading the log
- val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 64 * 10)
var log = createLog(logDir, logConfig)
assertEquals(None, log.oldestProducerSnapshotOffset)
@@ -740,7 +740,7 @@ class LogTest {
@Test
def testLogReinitializeAfterManualDelete(): Unit = {
- val logConfig = LogTest.createLogConfig()
+ val logConfig = LogTestUtils.createLogConfig()
// simulate a case where log data does not exist but the start offset is
non-zero
val log = createLog(logDir, logConfig, logStartOffset = 500)
assertEquals(500, log.logStartOffset)
@@ -749,7 +749,7 @@ class LogTest {
@Test
def testLogEndLessThanStartAfterReopen(): Unit = {
- val logConfig = LogTest.createLogConfig()
+ val logConfig = LogTestUtils.createLogConfig()
var log = createLog(logDir, logConfig)
for (i <- 0 until 5) {
val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
@@ -782,7 +782,7 @@ class LogTest {
@Test
def testNonActiveSegmentsFrom(): Unit = {
- val logConfig = LogTest.createLogConfig()
+ val logConfig = LogTestUtils.createLogConfig()
val log = createLog(logDir, logConfig)
for (i <- 0 until 5) {
@@ -804,7 +804,7 @@ class LogTest {
@Test
def testInconsistentLogSegmentRange(): Unit = {
- val logConfig = LogTest.createLogConfig()
+ val logConfig = LogTestUtils.createLogConfig()
val log = createLog(logDir, logConfig)
for (i <- 0 until 5) {
@@ -818,7 +818,7 @@ class LogTest {
@Test
def testLogDelete(): Unit = {
- val logConfig = LogTest.createLogConfig()
+ val logConfig = LogTestUtils.createLogConfig()
val log = createLog(logDir, logConfig)
for (i <- 0 to 100) {
@@ -846,7 +846,7 @@ class LogTest {
val scheduler = new KafkaScheduler(1)
try {
scheduler.startup()
- val logConfig = LogTest.createLogConfig()
+ val logConfig = LogTestUtils.createLogConfig()
val log = createLog(logDir, logConfig, scheduler = scheduler)
val producerExpireCheck = log.producerExpireCheck
@@ -875,7 +875,7 @@ class LogTest {
@Test
def testProducerIdMapOffsetUpdatedForNonIdempotentData(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig)
val records = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)))
log.appendAsLeader(records, leaderEpoch = 0)
@@ -885,7 +885,7 @@ class LogTest {
@Test
def testRebuildProducerIdMapWithCompactedData(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig)
val pid = 1L
val epoch = 0.toShort
@@ -928,7 +928,7 @@ class LogTest {
@Test
def testRebuildProducerStateWithEmptyCompactedBatch(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig)
val pid = 1L
val epoch = 0.toShort
@@ -969,7 +969,7 @@ class LogTest {
@Test
def testUpdateProducerIdMapWithCompactedData(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig)
val pid = 1L
val epoch = 0.toShort
@@ -1002,7 +1002,7 @@ class LogTest {
@Test
def testProducerIdMapTruncateTo(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("a".getBytes))), leaderEpoch = 0)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("b".getBytes))), leaderEpoch = 0)
@@ -1027,7 +1027,7 @@ class LogTest {
@Test
def testProducerIdMapTruncateToWithNoSnapshots(): Unit = {
// This ensures that the upgrade optimization path cannot be hit after
initial loading
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig)
val pid = 1L
val epoch = 0.toShort
@@ -1051,7 +1051,7 @@ class LogTest {
@Test
def testRetentionDeletesProducerStateSnapshots(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
val log = createLog(logDir, logConfig)
val pid1 = 1L
val epoch = 0.toShort
@@ -1079,7 +1079,7 @@ class LogTest {
@Test
def testRetentionIdempotency(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes))), leaderEpoch = 0)
@@ -1100,7 +1100,7 @@ class LogTest {
@Test
def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, fileDeleteDelayMs = 0)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, fileDeleteDelayMs = 0)
val log = createLog(logDir, logConfig)
val pid1 = 1L
val epoch = 0.toShort
@@ -1127,7 +1127,7 @@ class LogTest {
@Test
def testCompactionDeletesProducerStateSnapshots(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5,
cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0)
val log = createLog(logDir, logConfig)
val pid1 = 1L
val epoch = 0.toShort
@@ -1171,7 +1171,7 @@ class LogTest {
def testLoadingLogDeletesProducerStateSnapshotsPastLogEndOffset(): Unit = {
val straySnapshotFile = Log.producerSnapshotFile(logDir, 42).toPath
Files.createFile(straySnapshotFile)
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, fileDeleteDelayMs = 0)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, fileDeleteDelayMs = 0)
createLog(logDir, logConfig)
assertEquals(0, ProducerStateManager.listSnapshotFiles(logDir).size,
"expected producer state snapshots greater than the log end offset to be
cleaned up")
@@ -1180,7 +1180,7 @@ class LogTest {
@Test
def testProducerIdMapTruncateFullyAndStartAt(): Unit = {
val records = TestUtils.singletonRecords("foo".getBytes)
- val logConfig = LogTest.createLogConfig(segmentBytes =
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
val log = createLog(logDir, logConfig)
log.appendAsLeader(records, leaderEpoch = 0)
log.takeProducerSnapshot()
@@ -1203,7 +1203,7 @@ class LogTest {
def testProducerIdExpirationOnSegmentDeletion(): Unit = {
val pid1 = 1L
val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)),
producerId = pid1, producerEpoch = 0, sequence = 0)
- val logConfig = LogTest.createLogConfig(segmentBytes =
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
val log = createLog(logDir, logConfig)
log.appendAsLeader(records, leaderEpoch = 0)
log.takeProducerSnapshot()
@@ -1228,7 +1228,7 @@ class LogTest {
@Test
def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint(): Unit
= {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch =
0)
log.roll(Some(1L))
@@ -1256,7 +1256,7 @@ class LogTest {
@Test
def testProducerSnapshotAfterSegmentRollOnAppend(): Unit = {
val producerId = 1L
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(Seq(new
SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))),
@@ -1288,7 +1288,7 @@ class LogTest {
@Test
def testRebuildTransactionalState(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val pid = 137L
@@ -1320,7 +1320,7 @@ class LogTest {
val producerIdExpirationCheckIntervalMs = 100
val pid = 23L
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, maxProducerIdExpirationMs =
maxProducerIdExpirationMs,
producerIdExpirationCheckIntervalMs =
producerIdExpirationCheckIntervalMs)
val records = Seq(new SimpleRecord(mockTime.milliseconds(),
"foo".getBytes))
@@ -1459,7 +1459,7 @@ class LogTest {
@Test
def testDuplicateAppendToFollower(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val epoch: Short = 0
val pid = 1L
@@ -1480,7 +1480,7 @@ class LogTest {
@Test
def testMultipleProducersWithDuplicatesInSingleAppend(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val pid1 = 1L
@@ -1547,7 +1547,7 @@ class LogTest {
@Test
def testDeleteSnapshotsOnIncrementLogStartOffset(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig)
val pid1 = 1L
val pid2 = 2L
@@ -1585,7 +1585,7 @@ class LogTest {
var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp =
mockTime.milliseconds)
val maxJitter = 20 * 60L
// create a log
- val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L,
segmentJitterMs = maxJitter)
+ val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L,
segmentJitterMs = maxJitter)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "Log begins with a single empty
segment.")
log.appendAsLeader(set, leaderEpoch = 0)
@@ -1612,7 +1612,7 @@ class LogTest {
val msgPerSeg = 10
val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10
messages
// create a log
- val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
@@ -1638,7 +1638,7 @@ class LogTest {
*/
@Test
def testAppendAndReadWithSequentialOffsets(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 71)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 71)
val log = createLog(logDir, logConfig)
val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
@@ -1662,7 +1662,7 @@ class LogTest {
*/
@Test
def testAppendAndReadWithNonSequentialOffsets(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 72)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 72)
val log = createLog(logDir, logConfig)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -1686,7 +1686,7 @@ class LogTest {
*/
@Test
def testReadAtLogGap(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 300)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 300)
val log = createLog(logDir, logConfig)
// keep appending until we have two segments with only a single message in
the second segment
@@ -1702,7 +1702,7 @@ class LogTest {
@Test
def testLogRollAfterLogHandlerClosed(): Unit = {
- val logConfig = LogTest.createLogConfig()
+ val logConfig = LogTestUtils.createLogConfig()
val log = createLog(logDir, logConfig)
log.closeHandlers()
assertThrows(classOf[KafkaStorageException], () => log.roll(Some(1L)))
@@ -1710,7 +1710,7 @@ class LogTest {
@Test
def testReadWithMinMessage(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 72)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 72)
val log = createLog(logDir, logConfig)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -1735,7 +1735,7 @@ class LogTest {
@Test
def testReadWithTooSmallMaxLength(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 72)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 72)
val log = createLog(logDir, logConfig)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -1769,7 +1769,7 @@ class LogTest {
def testReadOutOfRange(): Unit = {
createEmptyLogs(logDir, 1024)
// set up replica log starting with offset 1024 and with one message (at
offset 1024)
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes),
leaderEpoch = 0)
@@ -1787,7 +1787,7 @@ class LogTest {
@Test
def testLogRolls(): Unit = {
/* create a multipart log with 100 messages */
- val logConfig = LogTest.createLogConfig(segmentBytes = 100)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 100)
val log = createLog(logDir, logConfig)
val numMessages = 100
val messageSets = (0 until numMessages).map(i =>
TestUtils.singletonRecords(value = i.toString.getBytes,
@@ -1824,7 +1824,7 @@ class LogTest {
@Test
def testCompressedMessages(): Unit = {
/* this log should roll after every messageset */
- val logConfig = LogTest.createLogConfig(segmentBytes = 110)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 110)
val log = createLog(logDir, logConfig)
/* append 2 compressed message sets, each with two messages giving offsets
0, 1, 2, 3 */
@@ -1848,7 +1848,7 @@ class LogTest {
for(messagesToAppend <- List(0, 1, 25)) {
logDir.mkdirs()
// first test a log segment starting at 0
- val logConfig = LogTest.createLogConfig(segmentBytes = 100, retentionMs
= 0)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 100,
retentionMs = 0)
val log = createLog(logDir, logConfig)
for(i <- 0 until messagesToAppend)
log.appendAsLeader(TestUtils.singletonRecords(value =
i.toString.getBytes, timestamp = mockTime.milliseconds - 10), leaderEpoch = 0)
@@ -1886,7 +1886,7 @@ class LogTest {
val messageSet = MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes))
// append messages to log
val configSegmentSize = messageSet.sizeInBytes - 1
- val logConfig = LogTest.createLogConfig(segmentBytes = configSegmentSize)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
configSegmentSize)
val log = createLog(logDir, logConfig)
assertThrows(classOf[RecordBatchTooLargeException], () =>
log.appendAsLeader(messageSet, leaderEpoch = 0))
@@ -1906,7 +1906,7 @@ class LogTest {
val messageSetWithKeyedMessage =
MemoryRecords.withRecords(CompressionType.NONE, keyedMessage)
val messageSetWithKeyedMessages =
MemoryRecords.withRecords(CompressionType.NONE, keyedMessage,
anotherKeyedMessage)
- val logConfig = LogTest.createLogConfig(cleanupPolicy = LogConfig.Compact)
+ val logConfig = LogTestUtils.createLogConfig(cleanupPolicy =
LogConfig.Compact)
val log = createLog(logDir, logConfig)
val errorMsgPrefix = "Compacted topic cannot accept message without key"
@@ -1955,7 +1955,7 @@ class LogTest {
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
- val logConfig = LogTest.createLogConfig(maxMessageBytes = maxMessageSize)
+ val logConfig = LogTestUtils.createLogConfig(maxMessageBytes =
maxMessageSize)
val log = createLog(logDir, logConfig)
// should be able to append the small message
@@ -1973,7 +1973,7 @@ class LogTest {
new SimpleRecord("change (I need more bytes)... blah blah
blah.".getBytes),
new SimpleRecord("More padding boo hoo".getBytes))
- val log = createLog(logDir, LogTest.createLogConfig(maxMessageBytes =
second.sizeInBytes - 1))
+ val log = createLog(logDir, LogTestUtils.createLogConfig(maxMessageBytes =
second.sizeInBytes - 1))
log.appendAsFollower(first)
// the second record is larger then limit but appendAsFollower does not
validate the size.
@@ -1982,7 +1982,7 @@ class LogTest {
@Test
def testLogRecoversTopicId(): Unit = {
- val logConfig = LogTest.createLogConfig()
+ val logConfig = LogTestUtils.createLogConfig()
var log = createLog(logDir, logConfig)
val topicId = Uuid.randomUuid()
@@ -1998,7 +1998,7 @@ class LogTest {
@Test
def testLogFailsWhenInconsistentTopicIdSet(): Unit = {
- val logConfig = LogTest.createLogConfig()
+ val logConfig = LogTestUtils.createLogConfig()
var log = createLog(logDir, logConfig)
val topicId = Uuid.randomUuid()
@@ -2020,7 +2020,7 @@ class LogTest {
@Test
def testBuildTimeIndexWhenNotAssigningOffsets(): Unit = {
val numMessages = 100
- val logConfig = LogTest.createLogConfig(segmentBytes = 10000,
indexIntervalBytes = 1)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 10000,
indexIntervalBytes = 1)
val log = createLog(logDir, logConfig)
val messages = (0 until numMessages).map { i =>
@@ -2035,7 +2035,7 @@ class LogTest {
@Test
def testFetchOffsetByTimestampIncludesLeaderEpoch(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 200,
indexIntervalBytes = 1)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200,
indexIntervalBytes = 1)
val log = createLog(logDir, logConfig)
assertEquals(None, log.fetchOffsetByTimestamp(0L))
@@ -2083,7 +2083,7 @@ class LogTest {
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
// create a log
- val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
@@ -2135,7 +2135,7 @@ class LogTest {
val setSize = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds).sizeInBytes
val msgPerSeg = 10
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
- val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize,
indexIntervalBytes = setSize - 1)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize,
indexIntervalBytes = setSize - 1)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
@@ -2175,7 +2175,7 @@ class LogTest {
def testAsyncDelete(): Unit = {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds - 1000L)
val asyncDeleteMs = 1000
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes =
10000,
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes =
10000,
retentionMs = 999, fileDeleteDelayMs =
asyncDeleteMs)
val log = createLog(logDir, logConfig)
@@ -2299,7 +2299,7 @@ class LogTest {
@Test
def testWriteLeaderEpochCheckpointAfterDirectoryRename(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.latestEpoch)
@@ -2315,7 +2315,7 @@ class LogTest {
@Test
def testTopicIdTransfersAfterDirectoryRename(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
// Write a topic ID to the partition metadata file to ensure it is
transferred correctly.
@@ -2342,7 +2342,7 @@ class LogTest {
@Test
def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch))
@@ -2355,12 +2355,12 @@ class LogTest {
@Test
def testLeaderEpochCacheClearedAfterDynamicMessageFormatDowngrade(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.latestEpoch)
- val downgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1,
+ val downgradedLogConfig = LogTestUtils.createLogConfig(segmentBytes =
1000, indexIntervalBytes = 1,
maxMessageBytes = 64 * 1024, messageFormatVersion =
kafka.api.KAFKA_0_10_2_IV0.shortVersion)
log.updateConfig(downgradedLogConfig)
LogTestUtils.assertLeaderEpochCacheEmpty(log)
@@ -2372,14 +2372,14 @@ class LogTest {
@Test
def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1,
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1,
maxMessageBytes = 64 * 1024, messageFormatVersion =
kafka.api.KAFKA_0_10_2_IV0.shortVersion)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("bar".getBytes())),
magicValue = RecordVersion.V1.value), leaderEpoch = 5)
LogTestUtils.assertLeaderEpochCacheEmpty(log)
- val upgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1,
+ val upgradedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1,
maxMessageBytes = 64 * 1024, messageFormatVersion =
kafka.api.KAFKA_0_11_0_IV0.shortVersion)
log.updateConfig(upgradedLogConfig)
log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
@@ -2391,9 +2391,9 @@ class LogTest {
@Test
def testSplitOnOffsetOverflow(): Unit = {
// create a log such that one log segment has offsets that overflow, and
call the split API on that segment
- val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1,
fileDeleteDelayMs = 1000)
+ val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1,
fileDeleteDelayMs = 1000)
val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig)
- assertTrue(LogTest.hasOffsetOverflow(log), "At least one segment must have
offset overflow")
+ assertTrue(LogTestUtils.hasOffsetOverflow(log), "At least one segment must
have offset overflow")
val allRecordsBeforeSplit = LogTest.allRecords(log)
@@ -2405,7 +2405,7 @@ class LogTest {
LogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
// verify we do not have offset overflow anymore
- assertFalse(LogTest.hasOffsetOverflow(log))
+ assertFalse(LogTestUtils.hasOffsetOverflow(log))
}
@Test
@@ -2437,17 +2437,17 @@ class LogTest {
}
private def testDegenerateSplitSegmentWithOverflow(segmentBaseOffset: Long,
records: List[MemoryRecords]): Unit = {
- val segment = LogTest.rawSegment(logDir, segmentBaseOffset)
+ val segment = LogTestUtils.rawSegment(logDir, segmentBaseOffset)
// Need to create the offset files explicitly to avoid triggering segment
recovery to truncate segment.
Log.offsetIndexFile(logDir, segmentBaseOffset).createNewFile()
Log.timeIndexFile(logDir, segmentBaseOffset).createNewFile()
records.foreach(segment.append _)
segment.close()
- val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1,
fileDeleteDelayMs = 1000)
+ val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1,
fileDeleteDelayMs = 1000)
val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
- val segmentWithOverflow = LogTest.firstOverflowSegment(log).getOrElse {
+ val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse
{
throw new AssertionError("Failed to create log with a segment which has
overflowed offsets")
}
@@ -2460,7 +2460,7 @@ class LogTest {
assertEquals(firstBatchBaseOffset, log.activeSegment.baseOffset)
LogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
- assertFalse(LogTest.hasOffsetOverflow(log))
+ assertFalse(LogTestUtils.hasOffsetOverflow(log))
}
@Test
@@ -2573,7 +2573,7 @@ class LogTest {
@Test
def testDeleteOldSegments(): Unit = {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds - 1000)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
val log = createLog(logDir, logConfig)
// append some messages to create some segments
@@ -2623,7 +2623,7 @@ class LogTest {
@Test
def testLogDeletionAfterClose(): Unit = {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds - 1000)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
val log = createLog(logDir, logConfig)
// append some messages to create some segments
@@ -2641,7 +2641,7 @@ class LogTest {
@Test
def testLogDeletionAfterDeleteRecords(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5)
val log = createLog(logDir, logConfig)
for (_ <- 0 until 15)
@@ -2673,7 +2673,7 @@ class LogTest {
@Test
def shouldDeleteSizeBasedSegments(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
val log = createLog(logDir, logConfig)
// append some messages to create some segments
@@ -2688,7 +2688,7 @@ class LogTest {
@Test
def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15)
val log = createLog(logDir, logConfig)
// append some messages to create some segments
@@ -2703,7 +2703,7 @@ class LogTest {
@Test
def shouldDeleteTimeBasedSegmentsReadyToBeDeleted(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp
= 10)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000)
val log = createLog(logDir, logConfig)
// append some messages to create some segments
@@ -2718,7 +2718,7 @@ class LogTest {
@Test
def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp
= mockTime.milliseconds)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000000)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000000)
val log = createLog(logDir, logConfig)
// append some messages to create some segments
@@ -2733,7 +2733,7 @@ class LogTest {
@Test
def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = 10L)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
val log = createLog(logDir, logConfig)
// append some messages to create some segments
@@ -2752,7 +2752,7 @@ class LogTest {
@Test
def
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete(): Unit
= {
def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes, timestamp = 10L)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy =
"compact,delete")
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy =
"compact,delete")
val log = createLog(logDir, logConfig)
// append some messages to create some segments
@@ -2768,7 +2768,7 @@ class LogTest {
def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete():
Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes, timestamp = 10L)
val recordsPerSegment = 5
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000,
cleanupPolicy = "compact")
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000,
cleanupPolicy = "compact")
val log = createLog(logDir, logConfig, brokerTopicStats)
// append some messages to create some segments
@@ -2840,7 +2840,7 @@ class LogTest {
@Test
def shouldTruncateLeaderEpochsWhenDeletingSegments(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
val log = createLog(logDir, logConfig)
val cache = epochCache(log)
@@ -2865,7 +2865,7 @@ class LogTest {
@Test
def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTest.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
val log = createLog(logDir, logConfig)
val cache = epochCache(log)
@@ -2894,7 +2894,7 @@ class LogTest {
baseOffset = startOffset, partitionLeaderEpoch = epoch)
}
- val logConfig = LogTest.createLogConfig(segmentBytes = 10 *
createRecords(0, 0).sizeInBytes)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 10 *
createRecords(0, 0).sizeInBytes)
val log = createLog(logDir, logConfig)
val cache = epochCache(log)
@@ -2938,7 +2938,7 @@ class LogTest {
@Test
def testFirstUnstableOffsetNoTransactionalData(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val records = MemoryRecords.withRecords(CompressionType.NONE,
@@ -2952,7 +2952,7 @@ class LogTest {
@Test
def testFirstUnstableOffsetWithTransactionalData(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val pid = 137L
@@ -2989,7 +2989,7 @@ class LogTest {
@Test
def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val lastOffset = 50L
@@ -3042,7 +3042,7 @@ class LogTest {
@Test
def testTransactionIndexUpdated(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val epoch = 0.toShort
@@ -3098,7 +3098,7 @@ class LogTest {
@Test
def testTransactionIndexUpdatedThroughReplication(): Unit = {
val epoch = 0.toShort
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val buffer = ByteBuffer.allocate(2048)
@@ -3189,7 +3189,7 @@ class LogTest {
def testZombieCoordinatorFenced(): Unit = {
val pid = 1L
val epoch = 0.toShort
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch,
mockTime)
@@ -3209,7 +3209,7 @@ class LogTest {
def testZombieCoordinatorFencedEmptyTransaction(): Unit = {
val pid = 1L
val epoch = 0.toShort
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val buffer = ByteBuffer.allocate(256)
@@ -3230,7 +3230,7 @@ class LogTest {
def testEndTxnWithFencedProducerEpoch(): Unit = {
val producerId = 1L
val epoch = 5.toShort
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 1)
@@ -3240,7 +3240,7 @@ class LogTest {
@Test
def testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val epoch = 0.toShort
val pid = 1L
@@ -3265,7 +3265,7 @@ class LogTest {
@Test
def testLastStableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion():
Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val epoch = 0.toShort
val pid = 1L
@@ -3295,7 +3295,7 @@ class LogTest {
def testAppendToTransactionIndexFailure(): Unit = {
val pid = 1L
val epoch = 0.toShort
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch,
mockTime)
@@ -3335,7 +3335,7 @@ class LogTest {
@Test
def testOffsetSnapshot(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
// append a few records
@@ -3357,7 +3357,7 @@ class LogTest {
@Test
def testLastStableOffsetWithMixedProducerData(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
val log = createLog(logDir, logConfig)
// for convenience, both producers share the same epoch
@@ -3416,7 +3416,7 @@ class LogTest {
new SimpleRecord("b".getBytes),
new SimpleRecord("c".getBytes))
- val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes =
records.sizeInBytes)
val log = createLog(logDir, logConfig)
val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
@@ -3453,7 +3453,7 @@ class LogTest {
val dirName = Log.logDeleteDirName(new TopicPartition("foo", 3))
val logDir = new File(tmpDir, dirName)
logDir.mkdirs()
- val logConfig = LogTest.createLogConfig()
+ val logConfig = LogTestUtils.createLogConfig()
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments)
}
@@ -3511,15 +3511,15 @@ class LogTest {
producerIdExpirationCheckIntervalMs: Int =
LogManager.ProducerIdExpirationCheckIntervalMs,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None): Log = {
- LogTest.createLog(dir, config, brokerTopicStats, scheduler, time,
logStartOffset, recoveryPoint,
+ LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time,
logStartOffset, recoveryPoint,
maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs,
lastShutdownClean, topicId = topicId)
}
private def createLogWithOffsetOverflow(logConfig: LogConfig): (Log,
LogSegment) = {
- LogTest.initializeLogDirWithOverflowedSegment(logDir)
+ LogTestUtils.initializeLogDirWithOverflowedSegment(logDir)
val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
- val segmentWithOverflow = LogTest.firstOverflowSegment(log).getOrElse {
+ val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse
{
throw new AssertionError("Failed to create log with a segment which has
overflowed offsets")
}
@@ -3528,126 +3528,6 @@ class LogTest {
}
object LogTest {
- def createLogConfig(segmentMs: Long = Defaults.SegmentMs,
- segmentBytes: Int = Defaults.SegmentSize,
- retentionMs: Long = Defaults.RetentionMs,
- retentionBytes: Long = Defaults.RetentionSize,
- segmentJitterMs: Long = Defaults.SegmentJitterMs,
- cleanupPolicy: String = Defaults.CleanupPolicy,
- maxMessageBytes: Int = Defaults.MaxMessageSize,
- indexIntervalBytes: Int = Defaults.IndexInterval,
- segmentIndexBytes: Int = Defaults.MaxIndexSize,
- messageFormatVersion: String =
Defaults.MessageFormatVersion,
- fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs):
LogConfig = {
- val logProps = new Properties()
-
- logProps.put(LogConfig.SegmentMsProp, segmentMs: java.lang.Long)
- logProps.put(LogConfig.SegmentBytesProp, segmentBytes: Integer)
- logProps.put(LogConfig.RetentionMsProp, retentionMs: java.lang.Long)
- logProps.put(LogConfig.RetentionBytesProp, retentionBytes: java.lang.Long)
- logProps.put(LogConfig.SegmentJitterMsProp, segmentJitterMs:
java.lang.Long)
- logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
- logProps.put(LogConfig.MaxMessageBytesProp, maxMessageBytes: Integer)
- logProps.put(LogConfig.IndexIntervalBytesProp, indexIntervalBytes: Integer)
- logProps.put(LogConfig.SegmentIndexBytesProp, segmentIndexBytes: Integer)
- logProps.put(LogConfig.MessageFormatVersionProp, messageFormatVersion)
- logProps.put(LogConfig.FileDeleteDelayMsProp, fileDeleteDelayMs:
java.lang.Long)
- LogConfig(logProps)
- }
-
- def createLog(dir: File,
- config: LogConfig,
- brokerTopicStats: BrokerTopicStats,
- scheduler: Scheduler,
- time: Time,
- logStartOffset: Long = 0L,
- recoveryPoint: Long = 0L,
- maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs: Int =
LogManager.ProducerIdExpirationCheckIntervalMs,
- lastShutdownClean: Boolean = true,
- topicId: Option[Uuid] = None): Log = {
- Log(dir = dir,
- config = config,
- logStartOffset = logStartOffset,
- recoveryPoint = recoveryPoint,
- scheduler = scheduler,
- brokerTopicStats = brokerTopicStats,
- time = time,
- maxProducerIdExpirationMs = maxProducerIdExpirationMs,
- producerIdExpirationCheckIntervalMs =
producerIdExpirationCheckIntervalMs,
- logDirFailureChannel = new LogDirFailureChannel(10),
- lastShutdownClean = lastShutdownClean,
- topicId = topicId,
- keepPartitionMetadataFile = true)
- }
-
- /**
- * Check if the given log contains any segment with records that cause
offset overflow.
- * @param log Log to check
- * @return true if log contains at least one segment with offset overflow;
false otherwise
- */
- def hasOffsetOverflow(log: Log): Boolean =
firstOverflowSegment(log).isDefined
-
- def firstOverflowSegment(log: Log): Option[LogSegment] = {
- def hasOverflow(baseOffset: Long, batch: RecordBatch): Boolean =
- batch.lastOffset > baseOffset + Int.MaxValue || batch.baseOffset <
baseOffset
-
- for (segment <- log.logSegments) {
- val overflowBatch = segment.log.batches.asScala.find(batch =>
hasOverflow(segment.baseOffset, batch))
- if (overflowBatch.isDefined)
- return Some(segment)
- }
- None
- }
-
- private def rawSegment(logDir: File, baseOffset: Long): FileRecords =
- FileRecords.open(Log.logFile(logDir, baseOffset))
-
- /**
- * Initialize the given log directory with a set of segments, one of which
will have an
- * offset which overflows the segment
- */
- def initializeLogDirWithOverflowedSegment(logDir: File): Unit = {
- def writeSampleBatches(baseOffset: Long, segment: FileRecords): Long = {
- def record(offset: Long) = {
- val data = offset.toString.getBytes
- new SimpleRecord(data, data)
- }
-
- segment.append(MemoryRecords.withRecords(baseOffset,
CompressionType.NONE, 0,
- record(baseOffset)))
- segment.append(MemoryRecords.withRecords(baseOffset + 1,
CompressionType.NONE, 0,
- record(baseOffset + 1),
- record(baseOffset + 2)))
- segment.append(MemoryRecords.withRecords(baseOffset + Int.MaxValue - 1,
CompressionType.NONE, 0,
- record(baseOffset + Int.MaxValue - 1)))
- // Need to create the offset files explicitly to avoid triggering
segment recovery to truncate segment.
- Log.offsetIndexFile(logDir, baseOffset).createNewFile()
- Log.timeIndexFile(logDir, baseOffset).createNewFile()
- baseOffset + Int.MaxValue
- }
-
- def writeNormalSegment(baseOffset: Long): Long = {
- val segment = rawSegment(logDir, baseOffset)
- try writeSampleBatches(baseOffset, segment)
- finally segment.close()
- }
-
- def writeOverflowSegment(baseOffset: Long): Long = {
- val segment = rawSegment(logDir, baseOffset)
- try {
- val nextOffset = writeSampleBatches(baseOffset, segment)
- writeSampleBatches(nextOffset, segment)
- } finally segment.close()
- }
-
- // We create three segments, the second of which contains offsets which
overflow
- var nextOffset = 0L
- nextOffset = writeNormalSegment(nextOffset)
- nextOffset = writeOverflowSegment(nextOffset)
- writeNormalSegment(nextOffset)
- }
-
def allRecords(log: Log): List[Record] = {
val recordsFound = ListBuffer[Record]()
for (logSegment <- log.logSegments) {
@@ -3661,12 +3541,4 @@ object LogTest {
def verifyRecordsInLog(log: Log, expectedRecords: List[Record]): Unit = {
assertEquals(expectedRecords, allRecords(log))
}
-
- /* extract all the keys from a log */
- def keysInLog(log: Log): Iterable[Long] = {
- for (logSegment <- log.logSegments;
- batch <- logSegment.log.batches.asScala if !batch.isControlBatch;
- record <- batch.asScala if record.hasValue && record.hasKey)
- yield TestUtils.readString(record.key).toLong
- }
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 29777b2..0169cd4 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -119,7 +119,7 @@ object LogTestUtils {
None
}
- private def rawSegment(logDir: File, baseOffset: Long): FileRecords =
+ def rawSegment(logDir: File, baseOffset: Long): FileRecords =
FileRecords.open(Log.logFile(logDir, baseOffset))
/**
@@ -185,8 +185,8 @@ object LogTestUtils {
assertFalse(file.getName.endsWith(Log.CleanedFileSuffix), "Unexpected
.cleaned file after recovery")
assertFalse(file.getName.endsWith(Log.SwapFileSuffix), "Unexpected .swap
file after recovery")
}
- assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog))
- assertFalse(LogTest.hasOffsetOverflow(recoveredLog))
+ assertEquals(expectedKeys, keysInLog(recoveredLog))
+ assertFalse(hasOffsetOverflow(recoveredLog))
recoveredLog
}
@@ -214,10 +214,10 @@ object LogTestUtils {
}
def readLog(log: Log,
- startOffset: Long,
- maxLength: Int,
- isolation: FetchIsolation = FetchLogEnd,
- minOneMessage: Boolean = true): FetchDataInfo = {
+ startOffset: Long,
+ maxLength: Int,
+ isolation: FetchIsolation = FetchLogEnd,
+ minOneMessage: Boolean = true): FetchDataInfo = {
log.read(startOffset, maxLength, isolation, minOneMessage)
}
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 965dd2d..329c00f 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintWriter}
import java.nio.ByteBuffer
import java.util
import java.util.Properties
-import kafka.log.{Log, LogConfig, LogManager, LogTest}
+import kafka.log.{Log, LogConfig, LogManager, LogTestUtils}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
import kafka.utils.{MockTime, TestUtils}
@@ -181,8 +181,8 @@ class DumpLogSegmentsTest {
@Test
def testDumpMetadataRecords(): Unit = {
val mockTime = new MockTime
- val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
- val log = LogTest.createLog(logDir, logConfig, new BrokerTopicStats,
mockTime.scheduler, mockTime)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+ val log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats,
mockTime.scheduler, mockTime)
val metadataRecords = Seq(
new ApiMessageAndVersion(