This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b706eb4  MINOR: Eliminate redundant functions in LogTest suite (#10732)
b706eb4 is described below

commit b706eb4a9549f9d8448ff5a889c02e48ee07d71d
Author: Kowshik Prakasam <[email protected]>
AuthorDate: Thu May 20 08:46:34 2021 -0700

    MINOR: Eliminate redundant functions in LogTest suite (#10732)
    
    Reviewers: Satish Duggana <[email protected]>, Jun Rao <[email protected]>
---
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 104 +++---
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |  12 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 348 +++++++--------------
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |  14 +-
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     |   6 +-
 5 files changed, 178 insertions(+), 306 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 801ba26..99ff1aa 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -72,7 +72,7 @@ class LogCleanerTest {
     // append messages to the log until we have four segments
     while(log.numberOfSegments < 4)
       log.appendAsLeader(record(log.logEndOffset.toInt, 
log.logEndOffset.toInt), leaderEpoch = 0)
-    val keysFound = LogTest.keysInLog(log)
+    val keysFound = LogTestUtils.keysInLog(log)
     assertEquals(0L until log.logEndOffset, keysFound)
 
     // pretend we have the following keys
@@ -85,8 +85,8 @@ class LogCleanerTest {
     val stats = new CleanerStats()
     val expectedBytesRead = segments.map(_.size).sum
     cleaner.cleanSegments(log, segments, map, 0L, stats, new 
CleanedTransactionMetadata)
-    val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_))
-    assertEquals(shouldRemain, LogTest.keysInLog(log))
+    val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
+    assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
     assertEquals(expectedBytesRead, stats.bytesRead)
   }
 
@@ -230,7 +230,7 @@ class LogCleanerTest {
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, 
log.activeSegment.baseOffset))
     assertEquals(List(2, 5, 7), lastOffsetsPerBatchInLog(log))
     assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1), lastSequencesInLog(log))
-    assertEquals(List(2, 3, 1, 4), LogTest.keysInLog(log))
+    assertEquals(List(2, 3, 1, 4), LogTestUtils.keysInLog(log))
     assertEquals(List(1, 3, 6, 7), offsetsInLog(log))
 
     // we have to reload the log to validate that the cleaner maintained 
sequence numbers correctly
@@ -262,7 +262,7 @@ class LogCleanerTest {
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, 
log.activeSegment.baseOffset))
     assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1, pid4 -> 0), 
lastSequencesInLog(log))
     assertEquals(List(2, 5, 7, 8), lastOffsetsPerBatchInLog(log))
-    assertEquals(List(3, 1, 4, 2), LogTest.keysInLog(log))
+    assertEquals(List(3, 1, 4, 2), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 6, 7, 8), offsetsInLog(log))
 
     reloadLog()
@@ -299,7 +299,7 @@ class LogCleanerTest {
 
     log.roll()
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, 
log.activeSegment.baseOffset))
-    assertEquals(List(3, 2), LogTest.keysInLog(log))
+    assertEquals(List(3, 2), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 6, 7, 8, 9), offsetsInLog(log))
 
     // ensure the transaction index is still correct
@@ -339,7 +339,7 @@ class LogCleanerTest {
 
     // we have only cleaned the records in the first segment
     val dirtyOffset = cleaner.clean(LogToClean(new TopicPartition("test", 0), 
log, 0L, log.activeSegment.baseOffset))._1
-    assertEquals(List(2, 3, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10), 
LogTest.keysInLog(log))
+    assertEquals(List(2, 3, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10), 
LogTestUtils.keysInLog(log))
 
     log.roll()
 
@@ -349,7 +349,7 @@ class LogCleanerTest {
 
     // finally only the keys from pid3 should remain
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, dirtyOffset, 
log.activeSegment.baseOffset))
-    assertEquals(List(2, 3, 6, 7, 8, 9, 11, 12), LogTest.keysInLog(log))
+    assertEquals(List(2, 3, 6, 7, 8, 9, 11, 12), LogTestUtils.keysInLog(log))
   }
 
   @Test
@@ -373,7 +373,7 @@ class LogCleanerTest {
 
     // cannot remove the marker in this pass because there are still valid 
records
     var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
-    assertEquals(List(1, 3, 2), LogTest.keysInLog(log))
+    assertEquals(List(1, 3, 2), LogTestUtils.keysInLog(log))
     assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
 
     appendProducer(Seq(1, 3))
@@ -382,17 +382,17 @@ class LogCleanerTest {
 
     // the first cleaning preserves the commit marker (at offset 3) since 
there were still records for the transaction
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
-    assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
+    assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // delete horizon forced to 0 to verify marker is not removed early
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
-    assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
+    assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // clean again with large delete horizon and verify the marker is removed
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
-    assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
+    assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
   }
 
@@ -421,11 +421,11 @@ class LogCleanerTest {
     log.roll()
 
     cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), 
deleteHorizonMs = Long.MaxValue)
-    assertEquals(List(2), LogTest.keysInLog(log))
+    assertEquals(List(2), LogTestUtils.keysInLog(log))
     assertEquals(List(1, 3, 4), offsetsInLog(log))
 
     cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), 
deleteHorizonMs = Long.MaxValue)
-    assertEquals(List(2), LogTest.keysInLog(log))
+    assertEquals(List(2), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4), offsetsInLog(log))
   }
 
@@ -462,14 +462,14 @@ class LogCleanerTest {
     // first time through the records are removed
     // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, 
{Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
     var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
-    assertEquals(List(2, 3), LogTest.keysInLog(log))
+    assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
     assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
 
     // the empty batch remains if cleaned again because it still holds the 
last sequence
     // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, 
{Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
-    assertEquals(List(2, 3), LogTest.keysInLog(log))
+    assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
     assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
 
@@ -483,13 +483,13 @@ class LogCleanerTest {
 
     // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, 
{3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
-    assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
+    assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
     assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
     assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
 
     // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: 
Commit}, {Producer2: 1}, {Producer2: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
-    assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
+    assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
     assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
     assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
   }
@@ -514,14 +514,14 @@ class LogCleanerTest {
     // first time through the control batch is retained as an empty batch
     // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
     var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
-    assertEquals(List(2, 3), LogTest.keysInLog(log))
+    assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(1, 2), offsetsInLog(log))
     assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
 
     // the empty control batch does not cause an exception when cleaned
     // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
-    assertEquals(List(2, 3), LogTest.keysInLog(log))
+    assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
     assertEquals(List(1, 2), offsetsInLog(log))
     assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
   }
@@ -600,12 +600,12 @@ class LogCleanerTest {
 
     // delete horizon set to 0 to verify marker is not removed early
     val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 
log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
-    assertEquals(List(3), LogTest.keysInLog(log))
+    assertEquals(List(3), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5), offsetsInLog(log))
 
     // clean again with large delete horizon and verify the marker is removed
     cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
-    assertEquals(List(3), LogTest.keysInLog(log))
+    assertEquals(List(3), LogTestUtils.keysInLog(log))
     assertEquals(List(4, 5), offsetsInLog(log))
   }
 
@@ -679,14 +679,14 @@ class LogCleanerTest {
     // first time through the records are removed
     var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
     assertAbortedTransactionIndexed()
-    assertEquals(List(), LogTest.keysInLog(log))
+    assertEquals(List(), LogTestUtils.keysInLog(log))
     assertEquals(List(2), offsetsInLog(log)) // abort marker is retained
     assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is 
retained
 
     // the empty batch remains if cleaned again because it still holds the 
last sequence
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
     assertAbortedTransactionIndexed()
-    assertEquals(List(), LogTest.keysInLog(log))
+    assertEquals(List(), LogTestUtils.keysInLog(log))
     assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained
     assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is 
retained
 
@@ -696,12 +696,12 @@ class LogCleanerTest {
 
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
     assertAbortedTransactionIndexed()
-    assertEquals(List(1), LogTest.keysInLog(log))
+    assertEquals(List(1), LogTestUtils.keysInLog(log))
     assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet 
gone because we read the empty batch
     assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not 
preserve the empty batch
 
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 
log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
-    assertEquals(List(1), LogTest.keysInLog(log))
+    assertEquals(List(1), LogTestUtils.keysInLog(log))
     assertEquals(List(3), offsetsInLog(log)) // abort marker is gone
     assertEquals(List(3), lastOffsetsPerBatchInLog(log))
 
@@ -725,7 +725,7 @@ class LogCleanerTest {
 
     while(log.numberOfSegments < 2)
       log.appendAsLeader(record(log.logEndOffset.toInt, 
Array.fill(largeMessageSize)(0: Byte)), leaderEpoch = 0)
-    val keysFound = LogTest.keysInLog(log)
+    val keysFound = LogTestUtils.keysInLog(log)
     assertEquals(0L until log.logEndOffset, keysFound)
 
     // pretend we have the following keys
@@ -736,8 +736,8 @@ class LogCleanerTest {
     // clean the log
     val stats = new CleanerStats()
     cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new 
CleanedTransactionMetadata)
-    val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_))
-    assertEquals(shouldRemain, LogTest.keysInLog(log))
+    val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
+    assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
   }
 
   /**
@@ -749,8 +749,8 @@ class LogCleanerTest {
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
     cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new 
CleanerStats, new CleanedTransactionMetadata)
-    val shouldRemain = LogTest.keysInLog(log).filter(k => 
!offsetMap.map.containsKey(k.toString))
-    assertEquals(shouldRemain, LogTest.keysInLog(log))
+    val shouldRemain = LogTestUtils.keysInLog(log).filter(k => 
!offsetMap.map.containsKey(k.toString))
+    assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
   }
 
   /**
@@ -797,7 +797,7 @@ class LogCleanerTest {
 
     while(log.numberOfSegments < 2)
       log.appendAsLeader(record(log.logEndOffset.toInt, 
Array.fill(largeMessageSize)(0: Byte)), leaderEpoch = 0)
-    val keysFound = LogTest.keysInLog(log)
+    val keysFound = LogTestUtils.keysInLog(log)
     assertEquals(0L until log.logEndOffset, keysFound)
 
     // Decrease the log's max message size
@@ -834,7 +834,7 @@ class LogCleanerTest {
       log.appendAsLeader(record(log.logEndOffset.toInt, 
log.logEndOffset.toInt), leaderEpoch = 0)
 
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
-    val keys = LogTest.keysInLog(log).toSet
+    val keys = LogTestUtils.keysInLog(log).toSet
     assertTrue((0 until leo.toInt by 2).forall(!keys.contains(_)), "None of 
the keys we deleted should still exist.")
   }
 
@@ -886,7 +886,7 @@ class LogCleanerTest {
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, 
log.activeSegment.baseOffset))
     assertEquals(List(1, 3, 4), lastOffsetsPerBatchInLog(log))
     assertEquals(Map(1L -> 0, 2L -> 1, 3L -> 0), lastSequencesInLog(log))
-    assertEquals(List(0, 1), LogTest.keysInLog(log))
+    assertEquals(List(0, 1), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4), offsetsInLog(log))
   }
 
@@ -909,7 +909,7 @@ class LogCleanerTest {
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, 
log.activeSegment.baseOffset))
     assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log))
     assertEquals(Map(producerId -> 2), lastSequencesInLog(log))
-    assertEquals(List(), LogTest.keysInLog(log))
+    assertEquals(List(), LogTestUtils.keysInLog(log))
     assertEquals(List(3), offsetsInLog(log))
 
     // Append a new entry from the producer and verify that the empty batch is 
cleaned up
@@ -919,7 +919,7 @@ class LogCleanerTest {
 
     assertEquals(List(3, 5), lastOffsetsPerBatchInLog(log))
     assertEquals(Map(producerId -> 4), lastSequencesInLog(log))
-    assertEquals(List(1, 5), LogTest.keysInLog(log))
+    assertEquals(List(1, 5), LogTestUtils.keysInLog(log))
     assertEquals(List(3, 4, 5), offsetsInLog(log))
   }
 
@@ -942,16 +942,16 @@ class LogCleanerTest {
 
     // clean the log with only one message removed
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, 
log.activeSegment.baseOffset))
-    assertEquals(List(1,0,1,0), LogTest.keysInLog(log))
+    assertEquals(List(1,0,1,0), LogTestUtils.keysInLog(log))
     assertEquals(List(1,2,3,4), offsetsInLog(log))
 
     // continue to make progress, even though we can only clean one message at 
a time
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 3, 
log.activeSegment.baseOffset))
-    assertEquals(List(0,1,0), LogTest.keysInLog(log))
+    assertEquals(List(0,1,0), LogTestUtils.keysInLog(log))
     assertEquals(List(2,3,4), offsetsInLog(log))
 
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 4, 
log.activeSegment.baseOffset))
-    assertEquals(List(1,0), LogTest.keysInLog(log))
+    assertEquals(List(1,0), LogTestUtils.keysInLog(log))
     assertEquals(List(3,4), offsetsInLog(log))
   }
 
@@ -1114,7 +1114,7 @@ class LogCleanerTest {
     while(log.numberOfSegments < 4)
       log.appendAsLeader(record(log.logEndOffset.toInt, 
log.logEndOffset.toInt), leaderEpoch = 0)
 
-    val keys = LogTest.keysInLog(log)
+    val keys = LogTestUtils.keysInLog(log)
     val map = new FakeOffsetMap(Int.MaxValue)
     keys.foreach(k => map.put(key(k), Long.MaxValue))
     assertThrows(classOf[LogCleaningAbortedException], () =>
@@ -1312,15 +1312,15 @@ class LogCleanerTest {
     logProps.put(LogConfig.FileDeleteDelayMsProp, 1000: java.lang.Integer)
     val config = LogConfig.fromProps(logConfig.originals, logProps)
 
-    LogTest.initializeLogDirWithOverflowedSegment(dir)
+    LogTestUtils.initializeLogDirWithOverflowedSegment(dir)
 
     val log = makeLog(config = config, recoveryPoint = Long.MaxValue)
-    val segmentWithOverflow = LogTest.firstOverflowSegment(log).getOrElse {
+    val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse 
{
       throw new AssertionError("Failed to create log with a segment which has 
overflowed offsets")
     }
 
     val numSegmentsInitial = log.logSegments.size
-    val allKeys = LogTest.keysInLog(log).toList
+    val allKeys = LogTestUtils.keysInLog(log).toList
     val expectedKeysAfterCleaning = new mutable.ArrayBuffer[Long]()
 
     // pretend we want to clean every alternate key
@@ -1336,15 +1336,15 @@ class LogCleanerTest {
         new CleanedTransactionMetadata)
     )
     assertEquals(numSegmentsInitial + 1, log.logSegments.size)
-    assertEquals(allKeys, LogTest.keysInLog(log))
-    assertFalse(LogTest.hasOffsetOverflow(log))
+    assertEquals(allKeys, LogTestUtils.keysInLog(log))
+    assertFalse(LogTestUtils.hasOffsetOverflow(log))
 
     // Clean each segment now that split is complete.
     for (segmentToClean <- log.logSegments)
       cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new 
CleanerStats(),
         new CleanedTransactionMetadata)
-    assertEquals(expectedKeysAfterCleaning, LogTest.keysInLog(log))
-    assertFalse(LogTest.hasOffsetOverflow(log))
+    assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
+    assertFalse(LogTestUtils.hasOffsetOverflow(log))
     log.close()
   }
 
@@ -1374,7 +1374,7 @@ class LogCleanerTest {
       log.appendAsLeader(record(log.logEndOffset.toInt, 
log.logEndOffset.toInt), leaderEpoch = 0)
       messageCount += 1
     }
-    val allKeys = LogTest.keysInLog(log)
+    val allKeys = LogTestUtils.keysInLog(log)
 
     // pretend we have odd-numbered keys
     val offsetMap = new FakeOffsetMap(Int.MaxValue)
@@ -1386,7 +1386,7 @@ class LogCleanerTest {
       new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
-    var cleanedKeys = LogTest.keysInLog(log)
+    var cleanedKeys = LogTestUtils.keysInLog(log)
     log.close()
 
     // 1) Simulate recovery just after .cleaned file is created, before rename 
to .swap
@@ -1402,7 +1402,7 @@ class LogCleanerTest {
       new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
-    cleanedKeys = LogTest.keysInLog(log)
+    cleanedKeys = LogTestUtils.keysInLog(log)
     log.close()
 
     // 2) Simulate recovery just after swap file is created, before old 
segment files are
@@ -1424,7 +1424,7 @@ class LogCleanerTest {
       new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
-    cleanedKeys = LogTest.keysInLog(log)
+    cleanedKeys = LogTestUtils.keysInLog(log)
 
     // 3) Simulate recovery after swap file is created and old segments files 
are renamed
     //    to .deleted. Clean operation is resumed during recovery.
@@ -1442,7 +1442,7 @@ class LogCleanerTest {
       new CleanedTransactionMetadata)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
-    cleanedKeys = LogTest.keysInLog(log)
+    cleanedKeys = LogTestUtils.keysInLog(log)
     log.close()
 
     // 4) Simulate recovery after swap is complete, but async deletion
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index fd75779..0546db4 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -1341,7 +1341,7 @@ class LogLoaderTest {
 
   @Test
   def testFullTransactionIndexRecovery(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 128 * 5)
     val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
@@ -1384,7 +1384,7 @@ class LogLoaderTest {
 
     log.close()
 
-    val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
+    val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 
5)
     val reloadedLog = createLog(logDir, reloadedLogConfig, lastShutdownClean = 
false)
     val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
     assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 
8L, 74L, 36L)), abortedTransactions)
@@ -1392,7 +1392,7 @@ class LogLoaderTest {
 
   @Test
   def testRecoverOnlyLastSegment(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 128 * 5)
     val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
@@ -1435,7 +1435,7 @@ class LogLoaderTest {
 
     log.close()
 
-    val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
+    val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 
5)
     val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = 
recoveryPoint, lastShutdownClean = false)
     val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
     assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 
8L, 74L, 36L)), abortedTransactions)
@@ -1443,7 +1443,7 @@ class LogLoaderTest {
 
   @Test
   def testRecoverLastSegmentWithNoSnapshots(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 128 * 5)
     val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
@@ -1489,7 +1489,7 @@ class LogLoaderTest {
 
     log.close()
 
-    val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
+    val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 
5)
     val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = 
recoveryPoint, lastShutdownClean = false)
     val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
     assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 
8L, 74L, 36L)), abortedTransactions)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c8891eb..3054116 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,13 +22,13 @@ import java.nio.ByteBuffer
 import java.nio.file.Files
 import java.util.concurrent.{Callable, Executors}
 import java.util.regex.Pattern
-import java.util.{Collections, Optional, Properties}
+import java.util.{Collections, Optional}
 import kafka.common.{OffsetsOutOfOrderException, RecordValidationException, 
UnexpectedAppendOffsetException}
 import kafka.log.Log.DeleteDirSuffix
 import kafka.metrics.KafkaYammerMetrics
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation, 
FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, 
LogOffsetMetadata, PartitionMetadataFile}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation, 
FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogOffsetMetadata, 
PartitionMetadataFile}
 import kafka.utils._
 import org.apache.kafka.common.{InvalidRecordException, KafkaException, 
TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
@@ -43,7 +43,7 @@ import org.easymock.EasyMock
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
-import scala.collection.{Iterable, Map}
+import scala.collection.Map
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
 
@@ -76,7 +76,7 @@ class LogTest {
 
   @Test
   def testHighWatermarkMetadataUpdatedAfterSegmentRoll(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
 
     def assertFetchSizeAndOffsets(fetchOffset: Long,
@@ -112,7 +112,7 @@ class LogTest {
 
   @Test
   def testAppendAsLeaderWithRaftLeader(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
     val leaderEpoch = 0
 
@@ -137,7 +137,7 @@ class LogTest {
 
   @Test
   def testAppendInfoFirstOffset(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
 
     val simpleRecords = List(
@@ -179,7 +179,7 @@ class LogTest {
     // resets the producer state. Specifically we are testing the case when
     // the segment position of the first unstable offset is unknown.
 
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
 
     val producerId = 17L
@@ -217,7 +217,7 @@ class LogTest {
 
   @Test
   def testHighWatermarkMaintenance(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
     val leaderEpoch = 0
 
@@ -303,7 +303,7 @@ class LogTest {
 
   @Test
   def testFetchUpToLogEndOffset(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
 
     log.appendAsLeader(TestUtils.records(List(
@@ -323,7 +323,7 @@ class LogTest {
 
   @Test
   def testFetchUpToHighWatermark(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
 
     log.appendAsLeader(TestUtils.records(List(
@@ -357,7 +357,7 @@ class LogTest {
 
   @Test
   def testActiveProducers(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
 
     def assertProducerState(
@@ -415,7 +415,7 @@ class LogTest {
 
   @Test
   def testFetchUpToLastStableOffset(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
@@ -501,7 +501,7 @@ class LogTest {
   @Test
   def testTimeBasedLogRoll(): Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L)
+    val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
 
     // create a log
     val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60)
@@ -548,7 +548,7 @@ class LogTest {
 
   @Test
   def testRollSegmentThatAlreadyExists(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L)
+    val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
 
     // create a log
     val log = createLog(logDir, logConfig)
@@ -674,7 +674,7 @@ class LogTest {
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
     // create a log
-    val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
     val log = createLog(logDir, logConfig)
     assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
 
@@ -701,7 +701,7 @@ class LogTest {
   def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
     // simulate the upgrade path by creating a new log with several segments, 
deleting the
     // snapshot files, and then reloading the log
-    val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 64 * 10)
     var log = createLog(logDir, logConfig)
     assertEquals(None, log.oldestProducerSnapshotOffset)
 
@@ -740,7 +740,7 @@ class LogTest {
 
   @Test
   def testLogReinitializeAfterManualDelete(): Unit = {
-    val logConfig = LogTest.createLogConfig()
+    val logConfig = LogTestUtils.createLogConfig()
     // simulate a case where log data does not exist but the start offset is 
non-zero
     val log = createLog(logDir, logConfig, logStartOffset = 500)
     assertEquals(500, log.logStartOffset)
@@ -749,7 +749,7 @@ class LogTest {
 
   @Test
   def testLogEndLessThanStartAfterReopen(): Unit = {
-    val logConfig = LogTest.createLogConfig()
+    val logConfig = LogTestUtils.createLogConfig()
     var log = createLog(logDir, logConfig)
     for (i <- 0 until 5) {
       val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
@@ -782,7 +782,7 @@ class LogTest {
 
   @Test
   def testNonActiveSegmentsFrom(): Unit = {
-    val logConfig = LogTest.createLogConfig()
+    val logConfig = LogTestUtils.createLogConfig()
     val log = createLog(logDir, logConfig)
 
     for (i <- 0 until 5) {
@@ -804,7 +804,7 @@ class LogTest {
 
   @Test
   def testInconsistentLogSegmentRange(): Unit = {
-    val logConfig = LogTest.createLogConfig()
+    val logConfig = LogTestUtils.createLogConfig()
     val log = createLog(logDir, logConfig)
 
     for (i <- 0 until 5) {
@@ -818,7 +818,7 @@ class LogTest {
 
   @Test
   def testLogDelete(): Unit = {
-    val logConfig = LogTest.createLogConfig()
+    val logConfig = LogTestUtils.createLogConfig()
     val log = createLog(logDir, logConfig)
 
     for (i <- 0 to 100) {
@@ -846,7 +846,7 @@ class LogTest {
     val scheduler = new KafkaScheduler(1)
     try {
       scheduler.startup()
-      val logConfig = LogTest.createLogConfig()
+      val logConfig = LogTestUtils.createLogConfig()
       val log = createLog(logDir, logConfig, scheduler = scheduler)
 
       val producerExpireCheck = log.producerExpireCheck
@@ -875,7 +875,7 @@ class LogTest {
 
   @Test
   def testProducerIdMapOffsetUpdatedForNonIdempotentData(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     val records = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)))
     log.appendAsLeader(records, leaderEpoch = 0)
@@ -885,7 +885,7 @@ class LogTest {
 
   @Test
   def testRebuildProducerIdMapWithCompactedData(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
@@ -928,7 +928,7 @@ class LogTest {
 
   @Test
   def testRebuildProducerStateWithEmptyCompactedBatch(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
@@ -969,7 +969,7 @@ class LogTest {
 
   @Test
   def testUpdateProducerIdMapWithCompactedData(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
@@ -1002,7 +1002,7 @@ class LogTest {
 
   @Test
   def testProducerIdMapTruncateTo(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("a".getBytes))), leaderEpoch = 0)
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("b".getBytes))), leaderEpoch = 0)
@@ -1027,7 +1027,7 @@ class LogTest {
   @Test
   def testProducerIdMapTruncateToWithNoSnapshots(): Unit = {
     // This ensures that the upgrade optimization path cannot be hit after 
initial loading
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
@@ -1051,7 +1051,7 @@ class LogTest {
 
   @Test
   def testRetentionDeletesProducerStateSnapshots(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
     val log = createLog(logDir, logConfig)
     val pid1 = 1L
     val epoch = 0.toShort
@@ -1079,7 +1079,7 @@ class LogTest {
 
   @Test
   def testRetentionIdempotency(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
     val log = createLog(logDir, logConfig)
 
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes))), leaderEpoch = 0)
@@ -1100,7 +1100,7 @@ class LogTest {
 
   @Test
   def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
     val log = createLog(logDir, logConfig)
     val pid1 = 1L
     val epoch = 0.toShort
@@ -1127,7 +1127,7 @@ class LogTest {
 
   @Test
   def testCompactionDeletesProducerStateSnapshots(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0)
     val log = createLog(logDir, logConfig)
     val pid1 = 1L
     val epoch = 0.toShort
@@ -1171,7 +1171,7 @@ class LogTest {
   def testLoadingLogDeletesProducerStateSnapshotsPastLogEndOffset(): Unit = {
     val straySnapshotFile = Log.producerSnapshotFile(logDir, 42).toPath
     Files.createFile(straySnapshotFile)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
     createLog(logDir, logConfig)
     assertEquals(0, ProducerStateManager.listSnapshotFiles(logDir).size,
       "expected producer state snapshots greater than the log end offset to be 
cleaned up")
@@ -1180,7 +1180,7 @@ class LogTest {
   @Test
   def testProducerIdMapTruncateFullyAndStartAt(): Unit = {
     val records = TestUtils.singletonRecords("foo".getBytes)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(records, leaderEpoch = 0)
     log.takeProducerSnapshot()
@@ -1203,7 +1203,7 @@ class LogTest {
   def testProducerIdExpirationOnSegmentDeletion(): Unit = {
     val pid1 = 1L
     val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), 
producerId = pid1, producerEpoch = 0, sequence = 0)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(records, leaderEpoch = 0)
     log.takeProducerSnapshot()
@@ -1228,7 +1228,7 @@ class LogTest {
 
   @Test
   def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint(): Unit 
= {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 
0)
     log.roll(Some(1L))
@@ -1256,7 +1256,7 @@ class LogTest {
   @Test
   def testProducerSnapshotAfterSegmentRollOnAppend(): Unit = {
     val producerId = 1L
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024)
     val log = createLog(logDir, logConfig)
 
     log.appendAsLeader(TestUtils.records(Seq(new 
SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))),
@@ -1288,7 +1288,7 @@ class LogTest {
 
   @Test
   def testRebuildTransactionalState(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
 
     val pid = 137L
@@ -1320,7 +1320,7 @@ class LogTest {
     val producerIdExpirationCheckIntervalMs = 100
 
     val pid = 23L
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 
maxProducerIdExpirationMs,
       producerIdExpirationCheckIntervalMs = 
producerIdExpirationCheckIntervalMs)
     val records = Seq(new SimpleRecord(mockTime.milliseconds(), 
"foo".getBytes))
@@ -1459,7 +1459,7 @@ class LogTest {
 
   @Test
   def testDuplicateAppendToFollower(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
     val epoch: Short = 0
     val pid = 1L
@@ -1480,7 +1480,7 @@ class LogTest {
 
   @Test
   def testMultipleProducersWithDuplicatesInSingleAppend(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
 
     val pid1 = 1L
@@ -1547,7 +1547,7 @@ class LogTest {
 
   @Test
   def testDeleteSnapshotsOnIncrementLogStartOffset(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     val pid1 = 1L
     val pid2 = 2L
@@ -1585,7 +1585,7 @@ class LogTest {
     var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = 
mockTime.milliseconds)
     val maxJitter = 20 * 60L
     // create a log
-    val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L, 
segmentJitterMs = maxJitter)
+    val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, 
segmentJitterMs = maxJitter)
     val log = createLog(logDir, logConfig)
     assertEquals(1, log.numberOfSegments, "Log begins with a single empty 
segment.")
     log.appendAsLeader(set, leaderEpoch = 0)
@@ -1612,7 +1612,7 @@ class LogTest {
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 
messages
     // create a log
-    val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
     val log = createLog(logDir, logConfig)
     assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
 
@@ -1638,7 +1638,7 @@ class LogTest {
    */
   @Test
   def testAppendAndReadWithSequentialOffsets(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 71)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 71)
     val log = createLog(logDir, logConfig)
     val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
@@ -1662,7 +1662,7 @@ class LogTest {
    */
   @Test
   def testAppendAndReadWithNonSequentialOffsets(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 72)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 72)
     val log = createLog(logDir, logConfig)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -1686,7 +1686,7 @@ class LogTest {
    */
   @Test
   def testReadAtLogGap(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 300)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 300)
     val log = createLog(logDir, logConfig)
 
     // keep appending until we have two segments with only a single message in 
the second segment
@@ -1702,7 +1702,7 @@ class LogTest {
 
   @Test
   def testLogRollAfterLogHandlerClosed(): Unit = {
-    val logConfig = LogTest.createLogConfig()
+    val logConfig = LogTestUtils.createLogConfig()
     val log = createLog(logDir,  logConfig)
     log.closeHandlers()
     assertThrows(classOf[KafkaStorageException], () => log.roll(Some(1L)))
@@ -1710,7 +1710,7 @@ class LogTest {
 
   @Test
   def testReadWithMinMessage(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 72)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 72)
     val log = createLog(logDir,  logConfig)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -1735,7 +1735,7 @@ class LogTest {
 
   @Test
   def testReadWithTooSmallMaxLength(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 72)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 72)
     val log = createLog(logDir,  logConfig)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -1769,7 +1769,7 @@ class LogTest {
   def testReadOutOfRange(): Unit = {
     createEmptyLogs(logDir, 1024)
     // set up replica log starting with offset 1024 and with one message (at 
offset 1024)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), 
leaderEpoch = 0)
 
@@ -1787,7 +1787,7 @@ class LogTest {
   @Test
   def testLogRolls(): Unit = {
     /* create a multipart log with 100 messages */
-    val logConfig = LogTest.createLogConfig(segmentBytes = 100)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 100)
     val log = createLog(logDir, logConfig)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => 
TestUtils.singletonRecords(value = i.toString.getBytes,
@@ -1824,7 +1824,7 @@ class LogTest {
   @Test
   def testCompressedMessages(): Unit = {
     /* this log should roll after every messageset */
-    val logConfig = LogTest.createLogConfig(segmentBytes = 110)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 110)
     val log = createLog(logDir, logConfig)
 
     /* append 2 compressed message sets, each with two messages giving offsets 
0, 1, 2, 3 */
@@ -1848,7 +1848,7 @@ class LogTest {
     for(messagesToAppend <- List(0, 1, 25)) {
       logDir.mkdirs()
       // first test a log segment starting at 0
-      val logConfig = LogTest.createLogConfig(segmentBytes = 100, retentionMs 
= 0)
+      val logConfig = LogTestUtils.createLogConfig(segmentBytes = 100, 
retentionMs = 0)
       val log = createLog(logDir, logConfig)
       for(i <- 0 until messagesToAppend)
         log.appendAsLeader(TestUtils.singletonRecords(value = 
i.toString.getBytes, timestamp = mockTime.milliseconds - 10), leaderEpoch = 0)
@@ -1886,7 +1886,7 @@ class LogTest {
     val messageSet = MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes))
     // append messages to log
     val configSegmentSize = messageSet.sizeInBytes - 1
-    val logConfig = LogTest.createLogConfig(segmentBytes = configSegmentSize)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
configSegmentSize)
     val log = createLog(logDir, logConfig)
 
     assertThrows(classOf[RecordBatchTooLargeException], () => 
log.appendAsLeader(messageSet, leaderEpoch = 0))
@@ -1906,7 +1906,7 @@ class LogTest {
     val messageSetWithKeyedMessage = 
MemoryRecords.withRecords(CompressionType.NONE, keyedMessage)
     val messageSetWithKeyedMessages = 
MemoryRecords.withRecords(CompressionType.NONE, keyedMessage, 
anotherKeyedMessage)
 
-    val logConfig = LogTest.createLogConfig(cleanupPolicy = LogConfig.Compact)
+    val logConfig = LogTestUtils.createLogConfig(cleanupPolicy = 
LogConfig.Compact)
     val log = createLog(logDir, logConfig)
 
     val errorMsgPrefix = "Compacted topic cannot accept message without key"
@@ -1955,7 +1955,7 @@ class LogTest {
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
-    val logConfig = LogTest.createLogConfig(maxMessageBytes = maxMessageSize)
+    val logConfig = LogTestUtils.createLogConfig(maxMessageBytes = 
maxMessageSize)
     val log = createLog(logDir, logConfig)
 
     // should be able to append the small message
@@ -1973,7 +1973,7 @@ class LogTest {
       new SimpleRecord("change (I need more bytes)... blah blah 
blah.".getBytes),
       new SimpleRecord("More padding boo hoo".getBytes))
 
-    val log = createLog(logDir, LogTest.createLogConfig(maxMessageBytes = 
second.sizeInBytes - 1))
+    val log = createLog(logDir, LogTestUtils.createLogConfig(maxMessageBytes = 
second.sizeInBytes - 1))
 
     log.appendAsFollower(first)
     // the second record is larger then limit but appendAsFollower does not 
validate the size.
@@ -1982,7 +1982,7 @@ class LogTest {
 
   @Test
   def testLogRecoversTopicId(): Unit = {
-    val logConfig = LogTest.createLogConfig()
+    val logConfig = LogTestUtils.createLogConfig()
     var log = createLog(logDir, logConfig)
 
     val topicId = Uuid.randomUuid()
@@ -1998,7 +1998,7 @@ class LogTest {
 
   @Test
   def testLogFailsWhenInconsistentTopicIdSet(): Unit = {
-    val logConfig = LogTest.createLogConfig()
+    val logConfig = LogTestUtils.createLogConfig()
     var log = createLog(logDir, logConfig)
 
     val topicId = Uuid.randomUuid()
@@ -2020,7 +2020,7 @@ class LogTest {
   @Test
   def testBuildTimeIndexWhenNotAssigningOffsets(): Unit = {
     val numMessages = 100
-    val logConfig = LogTest.createLogConfig(segmentBytes = 10000, 
indexIntervalBytes = 1)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 10000, 
indexIntervalBytes = 1)
     val log = createLog(logDir, logConfig)
 
     val messages = (0 until numMessages).map { i =>
@@ -2035,7 +2035,7 @@ class LogTest {
 
   @Test
   def testFetchOffsetByTimestampIncludesLeaderEpoch(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
     val log = createLog(logDir, logConfig)
 
     assertEquals(None, log.fetchOffsetByTimestamp(0L))
@@ -2083,7 +2083,7 @@ class LogTest {
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
 
     // create a log
-    val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
     val log = createLog(logDir, logConfig)
     assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
 
@@ -2135,7 +2135,7 @@ class LogTest {
     val setSize = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds).sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
-    val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, 
indexIntervalBytes = setSize - 1)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize, 
indexIntervalBytes = setSize - 1)
     val log = createLog(logDir, logConfig)
     assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
 
@@ -2175,7 +2175,7 @@ class LogTest {
   def testAsyncDelete(): Unit = {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds - 1000L)
     val asyncDeleteMs = 1000
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 
10000,
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 
10000,
                                     retentionMs = 999, fileDeleteDelayMs = 
asyncDeleteMs)
     val log = createLog(logDir, logConfig)
 
@@ -2299,7 +2299,7 @@ class LogTest {
 
   @Test
   def testWriteLeaderEpochCheckpointAfterDirectoryRename(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
     assertEquals(Some(5), log.latestEpoch)
@@ -2315,7 +2315,7 @@ class LogTest {
 
   @Test
   def testTopicIdTransfersAfterDirectoryRename(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)
 
     // Write a topic ID to the partition metadata file to ensure it is 
transferred correctly.
@@ -2342,7 +2342,7 @@ class LogTest {
 
   @Test
   def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
     assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch))
@@ -2355,12 +2355,12 @@ class LogTest {
 
   @Test
   def testLeaderEpochCacheClearedAfterDynamicMessageFormatDowngrade(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
     assertEquals(Some(5), log.latestEpoch)
 
-    val downgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1,
+    val downgradedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 
1000, indexIntervalBytes = 1,
       maxMessageBytes = 64 * 1024, messageFormatVersion = 
kafka.api.KAFKA_0_10_2_IV0.shortVersion)
     log.updateConfig(downgradedLogConfig)
     LogTestUtils.assertLeaderEpochCacheEmpty(log)
@@ -2372,14 +2372,14 @@ class LogTest {
 
   @Test
   def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1,
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1,
       maxMessageBytes = 64 * 1024, messageFormatVersion = 
kafka.api.KAFKA_0_10_2_IV0.shortVersion)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("bar".getBytes())),
       magicValue = RecordVersion.V1.value), leaderEpoch = 5)
     LogTestUtils.assertLeaderEpochCacheEmpty(log)
 
-    val upgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1,
+    val upgradedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1,
       maxMessageBytes = 64 * 1024, messageFormatVersion = 
kafka.api.KAFKA_0_11_0_IV0.shortVersion)
     log.updateConfig(upgradedLogConfig)
     log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
@@ -2391,9 +2391,9 @@ class LogTest {
   @Test
   def testSplitOnOffsetOverflow(): Unit = {
     // create a log such that one log segment has offsets that overflow, and 
call the split API on that segment
-    val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, 
fileDeleteDelayMs = 1000)
+    val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, 
fileDeleteDelayMs = 1000)
     val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig)
-    assertTrue(LogTest.hasOffsetOverflow(log), "At least one segment must have 
offset overflow")
+    assertTrue(LogTestUtils.hasOffsetOverflow(log), "At least one segment must 
have offset overflow")
 
     val allRecordsBeforeSplit = LogTest.allRecords(log)
 
@@ -2405,7 +2405,7 @@ class LogTest {
     LogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
 
     // verify we do not have offset overflow anymore
-    assertFalse(LogTest.hasOffsetOverflow(log))
+    assertFalse(LogTestUtils.hasOffsetOverflow(log))
   }
 
   @Test
@@ -2437,17 +2437,17 @@ class LogTest {
   }
 
   private def testDegenerateSplitSegmentWithOverflow(segmentBaseOffset: Long, 
records: List[MemoryRecords]): Unit = {
-    val segment = LogTest.rawSegment(logDir, segmentBaseOffset)
+    val segment = LogTestUtils.rawSegment(logDir, segmentBaseOffset)
     // Need to create the offset files explicitly to avoid triggering segment 
recovery to truncate segment.
     Log.offsetIndexFile(logDir, segmentBaseOffset).createNewFile()
     Log.timeIndexFile(logDir, segmentBaseOffset).createNewFile()
     records.foreach(segment.append _)
     segment.close()
 
-    val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, 
fileDeleteDelayMs = 1000)
+    val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, 
fileDeleteDelayMs = 1000)
     val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
 
-    val segmentWithOverflow = LogTest.firstOverflowSegment(log).getOrElse {
+    val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse 
{
       throw new AssertionError("Failed to create log with a segment which has 
overflowed offsets")
     }
 
@@ -2460,7 +2460,7 @@ class LogTest {
     assertEquals(firstBatchBaseOffset, log.activeSegment.baseOffset)
     LogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
 
-    assertFalse(LogTest.hasOffsetOverflow(log))
+    assertFalse(LogTestUtils.hasOffsetOverflow(log))
   }
 
   @Test
@@ -2573,7 +2573,7 @@ class LogTest {
   @Test
   def testDeleteOldSegments(): Unit = {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds - 1000)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
     val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
@@ -2623,7 +2623,7 @@ class LogTest {
   @Test
   def testLogDeletionAfterClose(): Unit = {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds - 1000)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
     val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
@@ -2641,7 +2641,7 @@ class LogTest {
   @Test
   def testLogDeletionAfterDeleteRecords(): Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5)
     val log = createLog(logDir, logConfig)
 
     for (_ <- 0 until 15)
@@ -2673,7 +2673,7 @@ class LogTest {
   @Test
   def shouldDeleteSizeBasedSegments(): Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
     val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
@@ -2688,7 +2688,7 @@ class LogTest {
   @Test
   def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize(): Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15)
     val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
@@ -2703,7 +2703,7 @@ class LogTest {
   @Test
   def shouldDeleteTimeBasedSegmentsReadyToBeDeleted(): Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp 
= 10)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000)
     val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
@@ -2718,7 +2718,7 @@ class LogTest {
   @Test
   def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted(): Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp 
= mockTime.milliseconds)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000000)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000000)
     val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
@@ -2733,7 +2733,7 @@ class LogTest {
   @Test
   def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes(), timestamp = 10L)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
     val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
@@ -2752,7 +2752,7 @@ class LogTest {
   @Test
   def 
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete(): Unit 
= {
     def createRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes, timestamp = 10L)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = 
"compact,delete")
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = 
"compact,delete")
     val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
@@ -2768,7 +2768,7 @@ class LogTest {
   def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete(): 
Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes, timestamp = 10L)
     val recordsPerSegment = 5
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000, 
cleanupPolicy = "compact")
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000, 
cleanupPolicy = "compact")
     val log = createLog(logDir, logConfig, brokerTopicStats)
 
     // append some messages to create some segments
@@ -2840,7 +2840,7 @@ class LogTest {
   @Test
   def shouldTruncateLeaderEpochsWhenDeletingSegments(): Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
     val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
@@ -2865,7 +2865,7 @@ class LogTest {
   @Test
   def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments(): Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
     val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
@@ -2894,7 +2894,7 @@ class LogTest {
         baseOffset = startOffset, partitionLeaderEpoch = epoch)
     }
 
-    val logConfig = LogTest.createLogConfig(segmentBytes = 10 * 
createRecords(0, 0).sizeInBytes)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 10 * 
createRecords(0, 0).sizeInBytes)
     val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
@@ -2938,7 +2938,7 @@ class LogTest {
 
   @Test
   def testFirstUnstableOffsetNoTransactionalData(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
 
     val records = MemoryRecords.withRecords(CompressionType.NONE,
@@ -2952,7 +2952,7 @@ class LogTest {
 
   @Test
   def testFirstUnstableOffsetWithTransactionalData(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
 
     val pid = 137L
@@ -2989,7 +2989,7 @@ class LogTest {
 
   @Test
   def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
     val lastOffset = 50L
 
@@ -3042,7 +3042,7 @@ class LogTest {
 
   @Test
   def testTransactionIndexUpdated(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
@@ -3098,7 +3098,7 @@ class LogTest {
   @Test
   def testTransactionIndexUpdatedThroughReplication(): Unit = {
     val epoch = 0.toShort
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
     val buffer = ByteBuffer.allocate(2048)
 
@@ -3189,7 +3189,7 @@ class LogTest {
   def testZombieCoordinatorFenced(): Unit = {
     val pid = 1L
     val epoch = 0.toShort
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
 
     val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, 
mockTime)
@@ -3209,7 +3209,7 @@ class LogTest {
   def testZombieCoordinatorFencedEmptyTransaction(): Unit = {
     val pid = 1L
     val epoch = 0.toShort
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
 
     val buffer = ByteBuffer.allocate(256)
@@ -3230,7 +3230,7 @@ class LogTest {
   def testEndTxnWithFencedProducerEpoch(): Unit = {
     val producerId = 1L
     val epoch = 5.toShort
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
     LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 1)
 
@@ -3240,7 +3240,7 @@ class LogTest {
 
   @Test
   def testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
     val pid = 1L
@@ -3265,7 +3265,7 @@ class LogTest {
 
   @Test
   def testLastStableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): 
Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
     val pid = 1L
@@ -3295,7 +3295,7 @@ class LogTest {
   def testAppendToTransactionIndexFailure(): Unit = {
     val pid = 1L
     val epoch = 0.toShort
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
 
     val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, 
mockTime)
@@ -3335,7 +3335,7 @@ class LogTest {
 
   @Test
   def testOffsetSnapshot(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
 
     // append a few records
@@ -3357,7 +3357,7 @@ class LogTest {
 
   @Test
   def testLastStableOffsetWithMixedProducerData(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
 
     // for convenience, both producers share the same epoch
@@ -3416,7 +3416,7 @@ class LogTest {
       new SimpleRecord("b".getBytes),
       new SimpleRecord("c".getBytes))
 
-    val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
records.sizeInBytes)
     val log = createLog(logDir, logConfig)
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
@@ -3453,7 +3453,7 @@ class LogTest {
     val dirName = Log.logDeleteDirName(new TopicPartition("foo", 3))
     val logDir = new File(tmpDir, dirName)
     logDir.mkdirs()
-    val logConfig = LogTest.createLogConfig()
+    val logConfig = LogTestUtils.createLogConfig()
     val log = createLog(logDir, logConfig)
     assertEquals(1, log.numberOfSegments)
   }
@@ -3511,15 +3511,15 @@ class LogTest {
                         producerIdExpirationCheckIntervalMs: Int = 
LogManager.ProducerIdExpirationCheckIntervalMs,
                         lastShutdownClean: Boolean = true,
                         topicId: Option[Uuid] = None): Log = {
-    LogTest.createLog(dir, config, brokerTopicStats, scheduler, time, 
logStartOffset, recoveryPoint,
+    LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, 
logStartOffset, recoveryPoint,
       maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs, 
lastShutdownClean, topicId = topicId)
   }
 
   private def createLogWithOffsetOverflow(logConfig: LogConfig): (Log, 
LogSegment) = {
-    LogTest.initializeLogDirWithOverflowedSegment(logDir)
+    LogTestUtils.initializeLogDirWithOverflowedSegment(logDir)
 
     val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
-    val segmentWithOverflow = LogTest.firstOverflowSegment(log).getOrElse {
+    val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse 
{
       throw new AssertionError("Failed to create log with a segment which has 
overflowed offsets")
     }
 
@@ -3528,126 +3528,6 @@ class LogTest {
 }
 
 object LogTest {
-  def createLogConfig(segmentMs: Long = Defaults.SegmentMs,
-                      segmentBytes: Int = Defaults.SegmentSize,
-                      retentionMs: Long = Defaults.RetentionMs,
-                      retentionBytes: Long = Defaults.RetentionSize,
-                      segmentJitterMs: Long = Defaults.SegmentJitterMs,
-                      cleanupPolicy: String = Defaults.CleanupPolicy,
-                      maxMessageBytes: Int = Defaults.MaxMessageSize,
-                      indexIntervalBytes: Int = Defaults.IndexInterval,
-                      segmentIndexBytes: Int = Defaults.MaxIndexSize,
-                      messageFormatVersion: String = 
Defaults.MessageFormatVersion,
-                      fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs): 
LogConfig = {
-    val logProps = new Properties()
-
-    logProps.put(LogConfig.SegmentMsProp, segmentMs: java.lang.Long)
-    logProps.put(LogConfig.SegmentBytesProp, segmentBytes: Integer)
-    logProps.put(LogConfig.RetentionMsProp, retentionMs: java.lang.Long)
-    logProps.put(LogConfig.RetentionBytesProp, retentionBytes: java.lang.Long)
-    logProps.put(LogConfig.SegmentJitterMsProp, segmentJitterMs: 
java.lang.Long)
-    logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
-    logProps.put(LogConfig.MaxMessageBytesProp, maxMessageBytes: Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, indexIntervalBytes: Integer)
-    logProps.put(LogConfig.SegmentIndexBytesProp, segmentIndexBytes: Integer)
-    logProps.put(LogConfig.MessageFormatVersionProp, messageFormatVersion)
-    logProps.put(LogConfig.FileDeleteDelayMsProp, fileDeleteDelayMs: 
java.lang.Long)
-    LogConfig(logProps)
-  }
-
-  def createLog(dir: File,
-                config: LogConfig,
-                brokerTopicStats: BrokerTopicStats,
-                scheduler: Scheduler,
-                time: Time,
-                logStartOffset: Long = 0L,
-                recoveryPoint: Long = 0L,
-                maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
-                producerIdExpirationCheckIntervalMs: Int = 
LogManager.ProducerIdExpirationCheckIntervalMs,
-                lastShutdownClean: Boolean = true,
-                topicId: Option[Uuid] = None): Log = {
-    Log(dir = dir,
-      config = config,
-      logStartOffset = logStartOffset,
-      recoveryPoint = recoveryPoint,
-      scheduler = scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time,
-      maxProducerIdExpirationMs = maxProducerIdExpirationMs,
-      producerIdExpirationCheckIntervalMs = 
producerIdExpirationCheckIntervalMs,
-      logDirFailureChannel = new LogDirFailureChannel(10),
-      lastShutdownClean = lastShutdownClean,
-      topicId = topicId,
-      keepPartitionMetadataFile = true)
-  }
-
-  /**
-   * Check if the given log contains any segment with records that cause 
offset overflow.
-   * @param log Log to check
-   * @return true if log contains at least one segment with offset overflow; 
false otherwise
-   */
-  def hasOffsetOverflow(log: Log): Boolean = 
firstOverflowSegment(log).isDefined
-
-  def firstOverflowSegment(log: Log): Option[LogSegment] = {
-    def hasOverflow(baseOffset: Long, batch: RecordBatch): Boolean =
-      batch.lastOffset > baseOffset + Int.MaxValue || batch.baseOffset < 
baseOffset
-
-    for (segment <- log.logSegments) {
-      val overflowBatch = segment.log.batches.asScala.find(batch => 
hasOverflow(segment.baseOffset, batch))
-      if (overflowBatch.isDefined)
-        return Some(segment)
-    }
-    None
-  }
-
-  private def rawSegment(logDir: File, baseOffset: Long): FileRecords =
-    FileRecords.open(Log.logFile(logDir, baseOffset))
-
-  /**
-   * Initialize the given log directory with a set of segments, one of which 
will have an
-   * offset which overflows the segment
-   */
-  def initializeLogDirWithOverflowedSegment(logDir: File): Unit = {
-    def writeSampleBatches(baseOffset: Long, segment: FileRecords): Long = {
-      def record(offset: Long) = {
-        val data = offset.toString.getBytes
-        new SimpleRecord(data, data)
-      }
-
-      segment.append(MemoryRecords.withRecords(baseOffset, 
CompressionType.NONE, 0,
-        record(baseOffset)))
-      segment.append(MemoryRecords.withRecords(baseOffset + 1, 
CompressionType.NONE, 0,
-        record(baseOffset + 1),
-        record(baseOffset + 2)))
-      segment.append(MemoryRecords.withRecords(baseOffset + Int.MaxValue - 1, 
CompressionType.NONE, 0,
-        record(baseOffset + Int.MaxValue - 1)))
-      // Need to create the offset files explicitly to avoid triggering 
segment recovery to truncate segment.
-      Log.offsetIndexFile(logDir, baseOffset).createNewFile()
-      Log.timeIndexFile(logDir, baseOffset).createNewFile()
-      baseOffset + Int.MaxValue
-    }
-
-    def writeNormalSegment(baseOffset: Long): Long = {
-      val segment = rawSegment(logDir, baseOffset)
-      try writeSampleBatches(baseOffset, segment)
-      finally segment.close()
-    }
-
-    def writeOverflowSegment(baseOffset: Long): Long = {
-      val segment = rawSegment(logDir, baseOffset)
-      try {
-        val nextOffset = writeSampleBatches(baseOffset, segment)
-        writeSampleBatches(nextOffset, segment)
-      } finally segment.close()
-    }
-
-    // We create three segments, the second of which contains offsets which 
overflow
-    var nextOffset = 0L
-    nextOffset = writeNormalSegment(nextOffset)
-    nextOffset = writeOverflowSegment(nextOffset)
-    writeNormalSegment(nextOffset)
-  }
-
   def allRecords(log: Log): List[Record] = {
     val recordsFound = ListBuffer[Record]()
     for (logSegment <- log.logSegments) {
@@ -3661,12 +3541,4 @@ object LogTest {
   def verifyRecordsInLog(log: Log, expectedRecords: List[Record]): Unit = {
     assertEquals(expectedRecords, allRecords(log))
   }
-
-  /* extract all the keys from a log */
-  def keysInLog(log: Log): Iterable[Long] = {
-    for (logSegment <- log.logSegments;
-         batch <- logSegment.log.batches.asScala if !batch.isControlBatch;
-         record <- batch.asScala if record.hasValue && record.hasKey)
-      yield TestUtils.readString(record.key).toLong
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala 
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 29777b2..0169cd4 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -119,7 +119,7 @@ object LogTestUtils {
     None
   }
 
-  private def rawSegment(logDir: File, baseOffset: Long): FileRecords =
+  def rawSegment(logDir: File, baseOffset: Long): FileRecords =
     FileRecords.open(Log.logFile(logDir, baseOffset))
 
   /**
@@ -185,8 +185,8 @@ object LogTestUtils {
       assertFalse(file.getName.endsWith(Log.CleanedFileSuffix), "Unexpected 
.cleaned file after recovery")
       assertFalse(file.getName.endsWith(Log.SwapFileSuffix), "Unexpected .swap 
file after recovery")
     }
-    assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog))
-    assertFalse(LogTest.hasOffsetOverflow(recoveredLog))
+    assertEquals(expectedKeys, keysInLog(recoveredLog))
+    assertFalse(hasOffsetOverflow(recoveredLog))
     recoveredLog
   }
 
@@ -214,10 +214,10 @@ object LogTestUtils {
   }
 
   def readLog(log: Log,
-                      startOffset: Long,
-                      maxLength: Int,
-                      isolation: FetchIsolation = FetchLogEnd,
-                      minOneMessage: Boolean = true): FetchDataInfo = {
+             startOffset: Long,
+             maxLength: Int,
+             isolation: FetchIsolation = FetchLogEnd,
+             minOneMessage: Boolean = true): FetchDataInfo = {
     log.read(startOffset, maxLength, isolation, minOneMessage)
   }
 
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 965dd2d..329c00f 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintWriter}
 import java.nio.ByteBuffer
 import java.util
 import java.util.Properties
-import kafka.log.{Log, LogConfig, LogManager, LogTest}
+import kafka.log.{Log, LogConfig, LogManager, LogTestUtils}
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
 import kafka.utils.{MockTime, TestUtils}
@@ -181,8 +181,8 @@ class DumpLogSegmentsTest {
   @Test
   def testDumpMetadataRecords(): Unit = {
     val mockTime = new MockTime
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = LogTest.createLog(logDir, logConfig, new BrokerTopicStats, 
mockTime.scheduler, mockTime)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats, 
mockTime.scheduler, mockTime)
 
     val metadataRecords = Seq(
       new ApiMessageAndVersion(

Reply via email to