This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 24994ef231f KAFKA-20084 Move LogCleanerManagerTest to storage module 
(#21343)
24994ef231f is described below

commit 24994ef231fe12cd18604125e5e7c27dd34847ff
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Jan 29 13:24:57 2026 +0500

    KAFKA-20084 Move LogCleanerManagerTest to storage module (#21343)
    
    LogCleanerManagerTest has been moved to the storage module and rewritten
    in Java.
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 901 --------------------
 .../storage/internals/log/LogCleanerManager.java   |  17 +-
 .../internals/log/LogCleanerManagerTest.java       | 928 +++++++++++++++++++++
 .../kafka/storage/internals/log/LogTestUtils.java  |  41 +
 4 files changed, 977 insertions(+), 910 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
deleted file mode 100644
index 0045551d109..00000000000
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ /dev/null
@@ -1,901 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.log
-
-import java.io.File
-import java.nio.file.Files
-import java.util.{Optional, Properties}
-import kafka.utils._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.util.MockTime
-import 
org.apache.kafka.storage.internals.log.LogCleaningState.{LOG_CLEANING_ABORTED, 
LOG_CLEANING_IN_PROGRESS}
-import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, 
LogCleanerManager, LogCleaningException, LogCleaningState, LogConfig, 
LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, LogToClean, PreCleanStats, ProducerStateManager, 
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
-import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Test}
-
-import java.lang.{Long => JLong}
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-import java.util.stream.Collectors
-import scala.jdk.OptionConverters.RichOptional
-
-/**
-  * Unit tests for the log cleaning logic
-  */
-class LogCleanerManagerTest extends Logging {
-
-  val tmpDir: File = TestUtils.tempDir()
-  val tmpDir2: File = TestUtils.tempDir()
-  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
-  val logDir2: File = TestUtils.randomPartitionLogDir(tmpDir2)
-  val topicPartition = new TopicPartition("log", 0)
-  val topicPartition2 = new TopicPartition("log2", 0)
-  val logProps = new Properties()
-  logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: 
java.lang.Integer)
-  logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer)
-  logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-  val logConfig: LogConfig = new LogConfig(logProps)
-  val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 
2014 for `currentTimeMs`
-  val offset = 999
-  val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
-
-  val cleanerCheckpoints: util.HashMap[TopicPartition, JLong] = new 
util.HashMap[TopicPartition, JLong]()
-
-  class LogCleanerManagerMock(logDirs: util.List[File],
-                              logs: 
util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog],
-                              logDirFailureChannel: LogDirFailureChannel) 
extends LogCleanerManager(logDirs, logs, logDirFailureChannel) {
-    override def allCleanerCheckpoints: util.Map[TopicPartition, JLong] = {
-      cleanerCheckpoints
-    }
-
-    override def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd: 
Optional[util.Map.Entry[TopicPartition, JLong]],
-                                   partitionToRemove: 
Optional[TopicPartition]): Unit = {
-      assert(partitionToRemove.isEmpty, "partitionToRemove argument with value 
not yet handled")
-      val entry = partitionToUpdateOrAdd.orElseThrow(() =>
-        new IllegalArgumentException("partitionToUpdateOrAdd==None argument 
not yet handled"))
-      cleanerCheckpoints.put(entry.getKey, entry.getValue)
-    }
-  }
-
-  @AfterEach
-  def tearDown(): Unit = {
-    Utils.delete(tmpDir)
-  }
-
-  private def setupIncreasinglyFilthyLogs(partitions: Seq[TopicPartition],
-                                          startNumBatches: Int,
-                                          batchIncrement: Int): 
util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog] = {
-    val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, 
UnifiedLog]()
-    var numBatches = startNumBatches
-
-    for (tp <- partitions) {
-      val log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT, 
topicPartition = tp)
-      logs.put(tp, log)
-
-      writeRecords(log, numBatches = numBatches, recordsPerBatch = 1, 
batchesPerSegment = 5)
-      numBatches += batchIncrement
-    }
-    logs
-  }
-
-  @Test
-  def testGrabFilthiestCompactedLogThrowsException(): Unit = {
-    val tp = new TopicPartition("A", 1)
-    val logSegmentSize = 
TestUtils.singletonRecords("test".getBytes).sizeInBytes * 10
-    val logSegmentsCount = 2
-    val tpDir = new File(logDir, "A-1")
-    Files.createDirectories(tpDir.toPath)
-    val logDirFailureChannel = new LogDirFailureChannel(10)
-    val config = createLowRetentionLogConfig(logSegmentSize, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    val maxTransactionTimeoutMs = 5 * 60 * 1000
-    val producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
-    val segments = new LogSegments(tp)
-    val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-      tpDir, topicPartition, logDirFailureChannel, Optional.empty, 
time.scheduler)
-    val producerStateManager = new ProducerStateManager(topicPartition, tpDir, 
maxTransactionTimeoutMs, producerStateManagerConfig, time)
-    val offsets = new LogLoader(
-      tpDir,
-      tp,
-      config,
-      time.scheduler,
-      time,
-      logDirFailureChannel,
-      true,
-      segments,
-      0L,
-      0L,
-      leaderEpochCache,
-      producerStateManager,
-      new ConcurrentHashMap[String, Integer],
-      false
-    ).load()
-    val localLog = new LocalLog(tpDir, config, segments, offsets.recoveryPoint,
-      offsets.nextOffsetMetadata, time.scheduler, time, tp, 
logDirFailureChannel)
-    // the exception should be caught and the partition that caused it marked 
as uncleanable
-    class LogMock extends UnifiedLog(offsets.logStartOffset, localLog, new 
BrokerTopicStats,
-      producerIdExpirationCheckIntervalMs, leaderEpochCache,
-      producerStateManager, Optional.empty, false, 
LogOffsetsListener.NO_OP_OFFSETS_LISTENER) {
-      // Throw an error in getFirstBatchTimestampForSegments since it is 
called in grabFilthiestLog()
-      override def getFirstBatchTimestampForSegments(segments: 
util.Collection[LogSegment]): util.Collection[java.lang.Long] =
-        throw new IllegalStateException("Error!")
-    }
-
-    val log: UnifiedLog = new LogMock()
-    writeRecords(log = log,
-      numBatches = logSegmentsCount * 2,
-      recordsPerBatch = 10,
-      batchesPerSegment = 2
-    )
-
-    val logsPool = new util.concurrent.ConcurrentHashMap[TopicPartition, 
UnifiedLog]()
-    logsPool.put(tp, log)
-    val cleanerManager = createCleanerManagerMock(logsPool)
-    cleanerCheckpoints.put(tp, 1)
-
-    val thrownException = assertThrows(classOf[LogCleaningException], () => 
cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get)
-    assertEquals(log, thrownException.log)
-    assertTrue(thrownException.getCause.isInstanceOf[IllegalStateException])
-  }
-
-  @Test
-  def testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio(): Unit = {
-    val tp0 = new TopicPartition("wishing-well", 0)
-    val tp1 = new TopicPartition("wishing-well", 1)
-    val tp2 = new TopicPartition("wishing-well", 2)
-    val partitions = Seq(tp0, tp1, tp2)
-
-    // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
-    val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, 
batchIncrement = 5)
-    val cleanerManager = createCleanerManagerMock(logs)
-    partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
-
-    val filthiestLog: LogToClean = 
cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
-    assertEquals(tp2, filthiestLog.topicPartition)
-    assertEquals(tp2, filthiestLog.log.topicPartition)
-  }
-
-  @Test
-  def testGrabFilthiestCompactedLogIgnoresUncleanablePartitions(): Unit = {
-    val tp0 = new TopicPartition("wishing-well", 0)
-    val tp1 = new TopicPartition("wishing-well", 1)
-    val tp2 = new TopicPartition("wishing-well", 2)
-    val partitions = Seq(tp0, tp1, tp2)
-
-    // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
-    val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, 
batchIncrement = 5)
-    val cleanerManager = createCleanerManagerMock(logs)
-    partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
-
-    cleanerManager.markPartitionUncleanable(logs.get(tp2).dir.getParent, tp2)
-
-    val filthiestLog: LogToClean = 
cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
-    assertEquals(tp1, filthiestLog.topicPartition)
-    assertEquals(tp1, filthiestLog.log.topicPartition)
-  }
-
-  @Test
-  def testGrabFilthiestCompactedLogIgnoresInProgressPartitions(): Unit = {
-    val tp0 = new TopicPartition("wishing-well", 0)
-    val tp1 = new TopicPartition("wishing-well", 1)
-    val tp2 = new TopicPartition("wishing-well", 2)
-    val partitions = Seq(tp0, tp1, tp2)
-
-    // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
-    val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, 
batchIncrement = 5)
-    val cleanerManager = createCleanerManagerMock(logs)
-    partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
-
-    cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS)
-
-    val filthiestLog: LogToClean = 
cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
-    assertEquals(tp1, filthiestLog.topicPartition)
-    assertEquals(tp1, filthiestLog.log.topicPartition)
-  }
-
-  @Test
-  def 
testGrabFilthiestCompactedLogIgnoresBothInProgressPartitionsAndUncleanablePartitions():
 Unit = {
-    val tp0 = new TopicPartition("wishing-well", 0)
-    val tp1 = new TopicPartition("wishing-well", 1)
-    val tp2 = new TopicPartition("wishing-well", 2)
-    val partitions = Seq(tp0, tp1, tp2)
-
-    // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
-    val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, 
batchIncrement = 5)
-    val cleanerManager = createCleanerManagerMock(logs)
-    partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
-
-    cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS)
-    cleanerManager.markPartitionUncleanable(logs.get(tp1).dir.getParent, tp1)
-
-    val filthiestLog: Optional[LogToClean] = 
cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats())
-    assertEquals(Optional.empty(), filthiestLog)
-  }
-
-  @Test
-  def testDirtyOffsetResetIfLargerThanEndOffset(): Unit = {
-    val tp = new TopicPartition("foo", 0)
-    val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, 
batchIncrement = 5)
-    val cleanerManager = createCleanerManagerMock(logs)
-    cleanerCheckpoints.put(tp, 200)
-
-    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats()).get
-    assertEquals(0L, filthiestLog.firstDirtyOffset)
-  }
-
-  @Test
-  def testDirtyOffsetResetIfSmallerThanStartOffset(): Unit = {
-    val tp = new TopicPartition("foo", 0)
-    val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, 
batchIncrement = 5)
-
-    logs.get(tp).maybeIncrementLogStartOffset(10L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
-    val cleanerManager = createCleanerManagerMock(logs)
-    cleanerCheckpoints.put(tp, 0L)
-
-    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats()).get
-    assertEquals(10L, filthiestLog.firstDirtyOffset)
-  }
-
-  @Test
-  def testLogStartOffsetLargerThanActiveSegmentBaseOffset(): Unit = {
-    val tp = new TopicPartition("foo", 0)
-    val log = createLog(segmentSize = 2048, 
TopicConfig.CLEANUP_POLICY_COMPACT, tp)
-
-    val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, 
UnifiedLog]()
-    logs.put(tp, log)
-
-    appendRecords(log, numRecords = 3)
-    appendRecords(log, numRecords = 3)
-    appendRecords(log, numRecords = 3)
-
-    assertEquals(1, log.logSegments.size)
-
-    log.maybeIncrementLogStartOffset(2L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
-    val cleanerManager = createCleanerManagerMock(logs)
-    cleanerCheckpoints.put(tp, 0L)
-
-    // The active segment is uncleanable and hence not filthy from the POV of 
the CleanerManager.
-    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats())
-    assertEquals(Optional.empty(), filthiestLog)
-  }
-
-  @Test
-  def testDirtyOffsetLargerThanActiveSegmentBaseOffset(): Unit = {
-    // It is possible in the case of an unclean leader election for the 
checkpoint
-    // dirty offset to get ahead of the active segment base offset, but still 
be
-    // within the range of the log.
-
-    val tp = new TopicPartition("foo", 0)
-
-    val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, 
UnifiedLog]()
-    val log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT, 
topicPartition = tp)
-    logs.put(tp, log)
-
-    appendRecords(log, numRecords = 3)
-    appendRecords(log, numRecords = 3)
-
-    assertEquals(1, log.logSegments.size)
-    assertEquals(0L, log.activeSegment.baseOffset)
-
-    val cleanerManager = createCleanerManagerMock(logs)
-    cleanerCheckpoints.put(tp, 3L)
-
-    // These segments are uncleanable and hence not filthy
-    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats())
-    assertEquals(Optional.empty(), filthiestLog)
-  }
-
-  /**
-    * When checking for logs with segments ready for deletion
-    * we shouldn't consider logs where cleanup.policy=delete
-    * as they are handled by the LogManager
-    */
-  @Test
-  def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs(): 
Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_DELETE)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    val readyToDelete = cleanerManager.deletableLogs().size
-    assertEquals(0, readyToDelete, "should have 0 logs ready to be deleted")
-  }
-
-  /**
-    * We should find logs with segments ready to be deleted when 
cleanup.policy=compact,delete
-    */
-  @Test
-  def 
testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactDeleteLogs(): 
Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    val readyToDelete = cleanerManager.deletableLogs().size
-    assertEquals(1, readyToDelete, "should have 1 logs ready to be deleted")
-  }
-
-  /**
-    * When looking for logs with segments ready to be deleted we should 
consider
-    * logs with cleanup.policy=compact because they may have segments from 
before the log start offset
-    */
-  @Test
-  def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactLogs(): 
Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    val readyToDelete = cleanerManager.deletableLogs().size
-    assertEquals(1, readyToDelete, "should have 1 logs ready to be deleted")
-  }
-
-  /**
-    * log under cleanup should be ineligible for compaction
-    */
-  @Test
-  def testLogsUnderCleanupIneligibleForCompaction(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_DELETE)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    log.appendAsLeader(records, 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.singletonRecords("test2".getBytes, 
key="test2".getBytes), 0)
-    log.updateHighWatermark(2L)
-
-    // simulate cleanup thread working on the log partition
-    val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions()
-    assertEquals(1, deletableLog.size, "should have 1 logs ready to be 
deleted")
-
-    // change cleanup policy from delete to compact
-    val logProps = new Properties()
-    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 
log.config.segmentSize(): Integer)
-    logProps.put(TopicConfig.RETENTION_MS_CONFIG, log.config.retentionMs: 
java.lang.Long)
-    logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0: Integer)
-    val config = new LogConfig(logProps)
-    log.updateConfig(config)
-
-    // log cleanup inprogress, the log is not available for compaction
-    val cleanable = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats()).toScala
-    assertEquals(0, cleanable.size, "should have 0 logs ready to be compacted")
-
-    // log cleanup finished, and log can be picked up for compaction
-    cleanerManager.resumeCleaning(
-      deletableLog.stream()
-        .map[TopicPartition](entry => entry.getKey)
-        .collect(Collectors.toSet[TopicPartition]())
-    )
-    val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats()).toScala
-    assertEquals(1, cleanable2.size, "should have 1 logs ready to be 
compacted")
-
-    // update cleanup policy to delete
-    logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE)
-    val config2 = new LogConfig(logProps)
-    log.updateConfig(config2)
-
-    // compaction in progress, should have 0 log eligible for log cleanup
-    val deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions()
-    assertEquals(0, deletableLog2.size, "should have 0 logs ready to be 
deleted")
-
-    // compaction done, should have 1 log eligible for log cleanup
-    cleanerManager.doneDeleting(util.List.of(cleanable2.get.topicPartition))
-    val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
-    assertEquals(1, deletableLog3.size, "should have 1 logs ready to be 
deleted")
-  }
-
-  @Test
-  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
-    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.getOrDefault(topicPartition, 0))
-
-    cleanerManager.updateCheckpoints(logDir, 
Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
-    // expect the checkpoint offset is now updated to the expected offset 
after doing updateCheckpoints
-    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-  }
-
-  @Test
-  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    // write some data into the cleaner-offset-checkpoint file
-    cleanerManager.updateCheckpoints(logDir, 
Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
-    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-
-    // updateCheckpoints should remove the topicPartition data in the logDir
-    cleanerManager.updateCheckpoints(logDir, Optional.empty(), 
Optional.of(topicPartition))
-    
assertFalse(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
-  }
-
-  @Test
-  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    // write some data into the cleaner-offset-checkpoint file in logDir and 
logDir2
-    cleanerManager.updateCheckpoints(logDir, 
Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
-    cleanerManager.updateCheckpoints(logDir2, 
Optional.of(util.Map.entry(topicPartition2, offset)), Optional.empty())
-    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2))
-
-    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
-    // verify the partition data in logDir is gone, and data in logDir2 is 
still there
-    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2))
-    
assertFalse(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
-  }
-
-  @Test
-  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-    val lowerOffset = 1L
-    val higherOffset = 1000L
-
-    // write some data into the cleaner-offset-checkpoint file in logDir
-    cleanerManager.updateCheckpoints(logDir, 
Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
-    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-
-    // we should not truncate the checkpoint data for checkpointed offset <= 
the given offset (higherOffset)
-    cleanerManager.maybeTruncateCheckpoint(logDir, topicPartition, 
higherOffset)
-    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-    // we should truncate the checkpoint data for checkpointed offset > the 
given offset (lowerOffset)
-    cleanerManager.maybeTruncateCheckpoint(logDir, topicPartition, lowerOffset)
-    assertEquals(lowerOffset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-  }
-
-  @Test
-  def testAlterCheckpointDirShouldRemoveDataInSrcDirAndAddInNewDir(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    // write some data into the cleaner-offset-checkpoint file in logDir
-    cleanerManager.updateCheckpoints(logDir, 
Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
-    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-
-    cleanerManager.alterCheckpointDir(topicPartition, logDir, logDir2)
-    // verify we still can get the partition offset after alterCheckpointDir
-    // This data should locate in logDir2, not logDir
-    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-
-    // force delete the logDir2 from checkpoints, so that the partition data 
should also be deleted
-    cleanerManager.handleLogDirFailure(logDir2.getAbsolutePath)
-    
assertFalse(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
-  }
-
-  /**
-    * log under cleanup should still be eligible for log truncation
-    */
-  @Test
-  def testConcurrentLogCleanupAndLogTruncation(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_DELETE)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    // log cleanup starts
-    val pausedPartitions = 
cleanerManager.pauseCleaningForNonCompactedPartitions()
-    // Log truncation happens due to unclean leader election
-    cleanerManager.abortAndPauseCleaning(log.topicPartition)
-    cleanerManager.resumeCleaning(util.Set.of(log.topicPartition))
-    // log cleanup finishes and pausedPartitions are resumed
-    cleanerManager.resumeCleaning(
-      pausedPartitions.stream()
-        .map[TopicPartition](entry => entry.getKey)
-        .collect(Collectors.toSet[TopicPartition]())
-    )
-
-    assertEquals(Optional.empty(), 
cleanerManager.cleaningState(log.topicPartition))
-  }
-
-  /**
-    * log under cleanup should still be eligible for topic deletion
-    */
-  @Test
-  def testConcurrentLogCleanupAndTopicDeletion(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_DELETE)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    // log cleanup starts
-    val pausedPartitions = 
cleanerManager.pauseCleaningForNonCompactedPartitions()
-    // Broker processes StopReplicaRequest with delete=true
-    cleanerManager.abortCleaning(log.topicPartition)
-    // log cleanup finishes and pausedPartitions are resumed
-    cleanerManager.resumeCleaning(
-      pausedPartitions.stream()
-        .map[TopicPartition](entry => entry.getKey)
-        .collect(Collectors.toSet[TopicPartition]())
-    )
-
-    assertEquals(Optional.empty(), 
cleanerManager.cleaningState(log.topicPartition))
-  }
-
-  /**
-    * When looking for logs with segments ready to be deleted we shouldn't 
consider
-    * logs that have had their partition marked as uncleanable.
-    */
-  @Test
-  def testLogsWithSegmentsToDeleteShouldNotConsiderUncleanablePartitions(): 
Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-    cleanerManager.markPartitionUncleanable(log.dir.getParent, topicPartition)
-
-    val readyToDelete = cleanerManager.deletableLogs().size
-    assertEquals(0, readyToDelete, "should have 0 logs ready to be deleted")
-  }
-
-  /**
-    * Test computation of cleanable range with no minimum compaction lag 
settings active where bounded by LSO
-    */
-  @Test
-  def testCleanableOffsetsForNone(): Unit = {
-    val logProps = new Properties()
-    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: 
java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    while (log.numberOfSegments < 8)
-      log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), 0)
-
-    log.updateHighWatermark(50)
-
-    val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
-    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
-    assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
-    assertEquals(log.highWatermark, log.lastStableOffset, "The high watermark 
equals the last stable offset as no transactions are in progress")
-    assertEquals(log.lastStableOffset, 
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is 
bounded by the last stable offset.")
-  }
-
-  /**
-   * Test computation of cleanable range with no minimum compaction lag 
settings active where bounded by active segment
-   */
-  @Test
-  def testCleanableOffsetsActiveSegment(): Unit = {
-    val logProps = new Properties()
-    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: 
java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    while (log.numberOfSegments < 8)
-      log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), 0)
-
-    log.updateHighWatermark(log.logEndOffset)
-
-    val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
-    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
-    assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
-    assertEquals(log.activeSegment.baseOffset, 
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset 
begins with the active segment.")
-  }
-
-  /**
-    * Test computation of cleanable range with a minimum compaction lag time
-    */
-  @Test
-  def testCleanableOffsetsForTime(): Unit = {
-    val compactionLag = 60 * 60 * 1000
-    val logProps = new Properties()
-    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: 
java.lang.Integer)
-    logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: 
java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    val t0 = time.milliseconds
-    while (log.numberOfSegments < 4)
-      log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, t0), 0)
-
-    val activeSegAtT0 = log.activeSegment
-
-    time.sleep(compactionLag + 1)
-    val t1 = time.milliseconds
-
-    while (log.numberOfSegments < 8)
-      log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, t1), 0)
-
-    log.updateHighWatermark(log.logEndOffset)
-
-    val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
-    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
-    assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
-    assertEquals(activeSegAtT0.baseOffset, 
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset 
begins with the second block of log entries.")
-  }
-
-  /**
-    * Test computation of cleanable range with a minimum compaction lag time 
that is small enough that
-    * the active segment contains it.
-    */
-  @Test
-  def testCleanableOffsetsForShortTime(): Unit = {
-    val compactionLag = 60 * 60 * 1000
-    val logProps = new Properties()
-    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: 
java.lang.Integer)
-    logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: 
java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    val t0 = time.milliseconds
-    while (log.numberOfSegments < 8)
-      log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, t0), 0)
-
-    log.updateHighWatermark(log.logEndOffset)
-
-    time.sleep(compactionLag + 1)
-
-    val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
-    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
-    assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
-    assertEquals(log.activeSegment.baseOffset, 
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset 
begins with active segment.")
-  }
-
-  @Test
-  def testCleanableOffsetsNeedsCheckpointReset(): Unit = {
-    val tp = new TopicPartition("foo", 0)
-    val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, 
batchIncrement = 5)
-    logs.get(tp).maybeIncrementLogStartOffset(10L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
-    var lastCleanOffset = Optional.of(15L.asInstanceOf[JLong])
-    var cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), 
lastCleanOffset, time.milliseconds)
-    assertFalse(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset 
should not be reset if valid")
-
-    logs.get(tp).maybeIncrementLogStartOffset(20L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), 
lastCleanOffset, time.milliseconds)
-    assertTrue(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset 
needs to be reset if less than log start offset")
-
-    lastCleanOffset = Optional.of(25L)
-    cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), 
lastCleanOffset, time.milliseconds)
-    assertTrue(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset 
needs to be reset if greater than log end offset")
-  }
-
-  @Test
-  def testUndecidedTransactionalDataNotCleanable(): Unit = {
-    val compactionLag = 60 * 60 * 1000
-    val logProps = new Properties()
-    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: 
java.lang.Integer)
-    logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: 
java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    val producerId = 15L
-    val producerEpoch = 0.toShort
-    val sequence = 0
-    
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
producerId, producerEpoch, sequence,
-      new SimpleRecord(time.milliseconds(), "1".getBytes, "a".getBytes),
-      new SimpleRecord(time.milliseconds(), "2".getBytes, "b".getBytes)), 0)
-    
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
producerId, producerEpoch, sequence + 2,
-      new SimpleRecord(time.milliseconds(), "3".getBytes, "c".getBytes)), 0)
-    log.roll()
-    log.updateHighWatermark(3L)
-
-    time.sleep(compactionLag + 1)
-    // although the compaction lag has been exceeded, the undecided data 
should not be cleaned
-    var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
Optional.of(0L), time.milliseconds())
-    assertEquals(0L, cleanableOffsets.firstDirtyOffset)
-    assertEquals(0L, cleanableOffsets.firstUncleanableDirtyOffset)
-
-    
log.appendAsLeader(MemoryRecords.withEndTransactionMarker(time.milliseconds(), 
producerId, producerEpoch,
-      new EndTransactionMarker(ControlRecordType.ABORT, 15)), 0,
-      AppendOrigin.COORDINATOR, RequestLocal.noCaching(), 
VerificationGuard.SENTINEL, TransactionVersion.TV_1.featureLevel())
-    log.roll()
-    log.updateHighWatermark(4L)
-
-    // the first segment should now become cleanable immediately
-    cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
Optional.of(0L), time.milliseconds())
-    assertEquals(0L, cleanableOffsets.firstDirtyOffset)
-    assertEquals(3L, cleanableOffsets.firstUncleanableDirtyOffset)
-
-    time.sleep(compactionLag + 1)
-
-    // the second segment becomes cleanable after the compaction lag
-    cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
Optional.of(0L), time.milliseconds())
-    assertEquals(0L, cleanableOffsets.firstDirtyOffset)
-    assertEquals(4L, cleanableOffsets.firstUncleanableDirtyOffset)
-  }
-
-  @Test
-  def testDoneCleaning(): Unit = {
-    val logProps = new Properties()
-    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: 
java.lang.Integer)
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-    while (log.numberOfSegments < 8)
-      log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), 0)
-
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
-    assertThrows(classOf[IllegalStateException], () => 
cleanerManager.doneCleaning(topicPartition, log.dir, 1))
-
-    cleanerManager.setCleaningState(topicPartition, 
LogCleaningState.logCleaningPaused(1))
-    assertThrows(classOf[IllegalStateException], () => 
cleanerManager.doneCleaning(topicPartition, log.dir, 1))
-
-    cleanerManager.setCleaningState(topicPartition, LOG_CLEANING_IN_PROGRESS)
-    val endOffset = 1L
-    cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
-    assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
-    
assertTrue(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
-    assertEquals(Some(endOffset), 
Option(cleanerManager.allCleanerCheckpoints.get(topicPartition)))
-
-    cleanerManager.setCleaningState(topicPartition, LOG_CLEANING_ABORTED)
-    cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
-    assertEquals(LogCleaningState.logCleaningPaused(1), 
cleanerManager.cleaningState(topicPartition).get)
-    
assertTrue(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
-  }
-
-  @Test
-  def testDoneDeleting(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
-    val log: UnifiedLog = createLog(records.sizeInBytes * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)
-    val cleanerManager: LogCleanerManager = createCleanerManager(log)
-    val tp = new TopicPartition("log", 0)
-
-    assertThrows(classOf[IllegalStateException], () => 
cleanerManager.doneDeleting(util.List.of(tp)))
-
-    cleanerManager.setCleaningState(tp, LogCleaningState.logCleaningPaused(1))
-    assertThrows(classOf[IllegalStateException], () => 
cleanerManager.doneDeleting(util.List.of(tp)))
-
-    cleanerManager.setCleaningState(tp, LOG_CLEANING_IN_PROGRESS)
-    cleanerManager.doneDeleting(util.List.of(tp))
-    assertTrue(cleanerManager.cleaningState(tp).isEmpty)
-
-    cleanerManager.setCleaningState(tp, LOG_CLEANING_ABORTED)
-    cleanerManager.doneDeleting(util.List.of(tp))
-    assertEquals(LogCleaningState.logCleaningPaused(1), 
cleanerManager.cleaningState(tp).get)
-  }
-
-  /**
-   * Logs with invalid checkpoint offsets should update their checkpoint 
offset even if the log doesn't need cleaning
-   */
-  @Test
-  def testCheckpointUpdatedForInvalidOffsetNoCleaning(): Unit = {
-    val tp = new TopicPartition("foo", 0)
-    val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, 
batchIncrement = 5)
-
-    logs.get(tp).maybeIncrementLogStartOffset(20L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    val cleanerManager = createCleanerManagerMock(logs)
-    cleanerCheckpoints.put(tp, 15L)
-
-    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats())
-    assertEquals(Optional.empty(), filthiestLog, "Log should not be selected 
for cleaning")
-    assertEquals(20L, cleanerCheckpoints.get(tp), "Unselected log should have 
checkpoint offset updated")
-  }
-
-  /**
-   * Logs with invalid checkpoint offsets should update their checkpoint 
offset even if they aren't selected
-   * for immediate cleaning
-   */
-  @Test
-  def testCheckpointUpdatedForInvalidOffsetNotSelected(): Unit = {
-    val tp0 = new TopicPartition("foo", 0)
-    val tp1 = new TopicPartition("foo", 1)
-    val partitions = Seq(tp0, tp1)
-
-    // create two logs, one with an invalid offset, and one that is dirtier 
than the log with an invalid offset
-    val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, 
batchIncrement = 5)
-    logs.get(tp0).maybeIncrementLogStartOffset(15L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    val cleanerManager = createCleanerManagerMock(logs)
-    cleanerCheckpoints.put(tp0, 10L)
-    cleanerCheckpoints.put(tp1, 5L)
-
-    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats()).get
-    assertEquals(tp1, filthiestLog.topicPartition, "Dirtier log should be 
selected")
-    assertEquals(15L, cleanerCheckpoints.get(tp0), "Unselected log should have 
checkpoint offset updated")
-  }
-
-  private def createCleanerManager(log: UnifiedLog): LogCleanerManager = {
-    val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, 
UnifiedLog]()
-    logs.put(topicPartition, log)
-    new LogCleanerManager(util.List.of(logDir, logDir2), logs, null)
-  }
-
-  private def createCleanerManagerMock(pool: 
util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog]): 
LogCleanerManagerMock = {
-    new LogCleanerManagerMock(util.List.of(logDir), pool, null)
-  }
-
-  private def createLog(segmentSize: Int,
-                        cleanupPolicy: String,
-                        topicPartition: TopicPartition = new 
TopicPartition("log", 0)): UnifiedLog = {
-    val config = createLowRetentionLogConfig(segmentSize, cleanupPolicy)
-    val partitionDir = new File(logDir, UnifiedLog.logDirName(topicPartition))
-
-    UnifiedLog.create(
-      partitionDir,
-      config,
-      0L,
-      0L,
-      time.scheduler,
-      new BrokerTopicStats,
-      time,
-      5 * 60 * 1000,
-      producerStateManagerConfig,
-      TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
-      new LogDirFailureChannel(10),
-      true,
-      Optional.empty)
-  }
-
-  private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: 
String): LogConfig = {
-    val logProps = new Properties()
-    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize: Integer)
-    logProps.put(TopicConfig.RETENTION_MS_CONFIG, 1: Integer)
-    logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy)
-    logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.05: 
java.lang.Double) // small for easier and clearer tests
-
-    new LogConfig(logProps)
-  }
-
-  private def writeRecords(log: UnifiedLog,
-                           numBatches: Int,
-                           recordsPerBatch: Int,
-                           batchesPerSegment: Int): Unit = {
-    for (i <- 0 until numBatches) {
-      appendRecords(log, recordsPerBatch)
-      if (i % batchesPerSegment == 0)
-        log.roll()
-    }
-    log.roll()
-  }
-
-  private def appendRecords(log: UnifiedLog, numRecords: Int): Unit = {
-    val startOffset = log.logEndOffset
-    val endOffset = startOffset + numRecords
-    var lastTimestamp = 0L
-    val records = (startOffset until endOffset).map { offset =>
-      val currentTimestamp = time.milliseconds()
-      if (offset == endOffset - 1)
-        lastTimestamp = currentTimestamp
-      new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, 
s"value-$offset".getBytes)
-    }
-
-    log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 
records:_*), 1)
-    log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
-  }
-
-  private def makeLog(dir: File = logDir, config: LogConfig) = {
-    UnifiedLog.create(
-      dir,
-      config,
-      0L,
-      0L,
-      time.scheduler,
-      new BrokerTopicStats,
-      time,
-      5 * 60 * 1000,
-      producerStateManagerConfig,
-      TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
-      new LogDirFailureChannel(10),
-      true,
-      Optional.empty
-    )
-  }
-
-  private def records(key: Int, value: Int, timestamp: Long) =
-    MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(timestamp, 
key.toString.getBytes, value.toString.getBytes))
-
-}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java
index 0b8c2b398d0..d7781a30254 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java
@@ -219,16 +219,16 @@ public class LogCleanerManager {
     }
 
     /**
-     * Public for unit test. Get the cleaning state of the partition.
+     * For testing only. Get the cleaning state of the partition.
      */
-    public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
+    Optional<LogCleaningState> cleaningState(TopicPartition tp) {
         return inLock(lock, () -> Optional.ofNullable(inProgress.get(tp)));
     }
 
     /**
-     * Public for unit test. Set the cleaning state of the partition.
+     * For testing only. Set the cleaning state of the partition.
      */
-    public void setCleaningState(TopicPartition tp, LogCleaningState state) {
+    void setCleaningState(TopicPartition tp, LogCleaningState state) {
         inLock(lock, () -> inProgress.put(tp, state));
     }
 
@@ -612,7 +612,7 @@ public class LogCleanerManager {
     }
 
     /**
-     * Returns an immutable set of the uncleanable partitions for a given log 
directory.
+     * For testing only. Returns an immutable set of the uncleanable 
partitions for a given log directory.
      * Only used for testing.
      */
     public Set<TopicPartition> uncleanablePartitions(String logDir) {
@@ -689,7 +689,8 @@ public class LogCleanerManager {
      * @return OffsetsToClean containing offsets for cleanable portion of log 
and whether the log checkpoint needs updating
      * @throws IOException    if an I/O error occurs
      */
-    public static OffsetsToClean cleanableOffsets(UnifiedLog log, 
Optional<Long> lastCleanOffset, long now) throws IOException {
+    // Visible for testing
+    static OffsetsToClean cleanableOffsets(UnifiedLog log, Optional<Long> 
lastCleanOffset, long now) throws IOException {
         // If the log segments are abnormally truncated and hence the 
checkpointed offset is no longer valid;
         // reset to the log starting offset and log the error
 
@@ -791,8 +792,6 @@ public class LogCleanerManager {
      * @param forceUpdateCheckpoint       whether to update the checkpoint 
associated with this log. if true, checkpoint should be
      *                                    reset to firstDirtyOffset
      */
-    public record OffsetsToClean(long firstDirtyOffset, long 
firstUncleanableDirtyOffset,
-                                 boolean forceUpdateCheckpoint) {
-
+    record OffsetsToClean(long firstDirtyOffset, long 
firstUncleanableDirtyOffset, boolean forceUpdateCheckpoint) {
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerManagerTest.java
new file mode 100644
index 00000000000..2918d4157a2
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerManagerTest.java
@@ -0,0 +1,928 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.common.RequestLocal;
+import org.apache.kafka.server.common.TransactionVersion;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.LogCleanerManager.OffsetsToClean;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT;
+import static 
org.apache.kafka.storage.internals.log.LogCleaningState.LOG_CLEANING_ABORTED;
+import static 
org.apache.kafka.storage.internals.log.LogCleaningState.LOG_CLEANING_IN_PROGRESS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for the log cleaning logic.
+ */
+class LogCleanerManagerTest {
+    private static final TopicPartition TOPIC_PARTITION = new 
TopicPartition("log", 0);
+    private static final TopicPartition TOPIC_PARTITION_2 = new 
TopicPartition("log2", 0);
+    private static final LogConfig LOG_CONFIG = createLogConfig();
+    private static final MockTime TIME = new MockTime(1400000000000L, 1000L);  
// Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
+    private static final long OFFSET = 999;
+    private static final ProducerStateManagerConfig 
PRODUCER_STATE_MANAGER_CONFIG =
+        new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false);
+
+    private File tmpDir;
+    private File tmpDir2;
+    private File logDir;
+    private File logDir2;
+
+    static class LogCleanerManagerMock extends LogCleanerManager {
+        private final Map<TopicPartition, Long> cleanerCheckpoints = new 
HashMap<>();
+
+        LogCleanerManagerMock(
+            List<File> logDirs,
+            ConcurrentMap<TopicPartition, UnifiedLog> logs,
+            LogDirFailureChannel logDirFailureChannel
+        ) {
+            super(logDirs, logs, logDirFailureChannel);
+        }
+
+        @Override
+        public Map<TopicPartition, Long> allCleanerCheckpoints() {
+            return cleanerCheckpoints;
+        }
+
+        @Override
+        public void updateCheckpoints(
+            File dataDir,
+            Optional<Map.Entry<TopicPartition, Long>> partitionToUpdateOrAdd,
+            Optional<TopicPartition> partitionToRemove
+        ) {
+            assert partitionToRemove.isEmpty() : "partitionToRemove argument 
with value not yet handled";
+
+            Map.Entry<TopicPartition, Long> entry = 
partitionToUpdateOrAdd.orElseThrow(() ->
+                new IllegalArgumentException("Empty 'partitionToUpdateOrAdd' 
argument not yet handled"));
+
+            addCheckpoint(entry.getKey(), entry.getValue());
+        }
+
+        void addCheckpoint(TopicPartition partition, long offset) {
+            cleanerCheckpoints.put(partition, offset);
+        }
+
+        long checkpointOffset(TopicPartition partition) {
+            return cleanerCheckpoints.get(partition);
+        }
+    }
+
+    // the exception should be caught and the partition that caused it marked 
as uncleanable
+    static class LogMock extends UnifiedLog {
+
+        LogMock(
+            long logStartOffset,
+            LocalLog localLog,
+            BrokerTopicStats brokerTopicStats,
+            int producerIdExpirationCheckIntervalMs,
+            LeaderEpochFileCache leaderEpochCache,
+            ProducerStateManager producerStateManager,
+            Optional<Uuid> topicId,
+            boolean remoteStorageSystemEnable,
+            LogOffsetsListener logOffsetsListener
+        ) throws IOException {
+            super(logStartOffset, localLog, brokerTopicStats, 
producerIdExpirationCheckIntervalMs, leaderEpochCache,
+                producerStateManager, topicId, remoteStorageSystemEnable, 
logOffsetsListener);
+        }
+
+        // Throw an error in getFirstBatchTimestampForSegments since it is 
called in grabFilthiestLog()
+        @Override
+        public Collection<Long> 
getFirstBatchTimestampForSegments(Collection<LogSegment> segments) {
+            throw new IllegalStateException("Error!");
+        }
+    }
+
+    @BeforeEach
+    public void setup() {
+        tmpDir = TestUtils.tempDirectory();
+        tmpDir2 = TestUtils.tempDirectory();
+        logDir = TestUtils.randomPartitionLogDir(tmpDir);
+        logDir2 = TestUtils.randomPartitionLogDir(tmpDir2);
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        Utils.delete(tmpDir);
+        Utils.delete(tmpDir2);
+    }
+
+    private ConcurrentMap<TopicPartition, UnifiedLog> 
setupIncreasinglyFilthyLogs(List<TopicPartition> partitions) throws IOException 
{
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = new 
ConcurrentHashMap<>();
+        int numBatches = 20;
+
+        for (TopicPartition tp : partitions) {
+            UnifiedLog log = createLog(2048, 
TopicConfig.CLEANUP_POLICY_COMPACT, tp);
+            logs.put(tp, log);
+
+            writeRecords(log, numBatches, 1, 5);
+            numBatches += 5;
+        }
+
+        return logs;
+    }
+
+    @Test
+    public void testGrabFilthiestCompactedLogThrowsException() throws 
IOException {
+        TopicPartition tp = new TopicPartition("A", 1);
+        int logSegmentSize = LogTestUtils.singletonRecords("test".getBytes(), 
null).sizeInBytes() * 10;
+        int logSegmentsCount = 2;
+        File tpDir = new File(logDir, "A-1");
+        Files.createDirectories(tpDir.toPath());
+
+        LogDirFailureChannel logDirFailureChannel = new 
LogDirFailureChannel(10);
+        LogConfig config = createLowRetentionLogConfig(logSegmentSize, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+        LogSegments segments = new LogSegments(tp);
+        LeaderEpochFileCache leaderEpochCache = 
UnifiedLog.createLeaderEpochCache(tpDir, TOPIC_PARTITION, logDirFailureChannel,
+            Optional.empty(), TIME.scheduler);
+        ProducerStateManager producerStateManager = new 
ProducerStateManager(TOPIC_PARTITION, tpDir, 5 * 60 * 1000,
+            PRODUCER_STATE_MANAGER_CONFIG, TIME);
+        LoadedLogOffsets offsets = new LogLoader(tpDir, tp, config, 
TIME.scheduler, TIME, logDirFailureChannel, true,
+            segments, 0L, 0L, leaderEpochCache, producerStateManager, new 
ConcurrentHashMap<>(), false).load();
+        LocalLog localLog = new LocalLog(tpDir, config, segments, 
offsets.recoveryPoint(), offsets.nextOffsetMetadata(),
+            TIME.scheduler, TIME, tp, logDirFailureChannel);
+        UnifiedLog log = new LogMock(offsets.logStartOffset(), localLog, new 
BrokerTopicStats(), PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+            leaderEpochCache, producerStateManager, Optional.empty(), false, 
LogOffsetsListener.NO_OP_OFFSETS_LISTENER);
+
+        writeRecords(log, logSegmentsCount * 2, 10, 2);
+
+        ConcurrentMap<TopicPartition, UnifiedLog> logsPool = new 
ConcurrentHashMap<>();
+        logsPool.put(tp, log);
+        LogCleanerManagerMock cleanerManager = 
createCleanerManagerMock(logsPool);
+        cleanerManager.addCheckpoint(tp, 1L);
+
+        LogCleaningException thrownException = 
assertThrows(LogCleaningException.class,
+            () -> cleanerManager.grabFilthiestCompactedLog(TIME, new 
PreCleanStats()).get());
+
+        assertEquals(log, thrownException.log);
+        assertInstanceOf(IllegalStateException.class, 
thrownException.getCause());
+    }
+
+    @Test
+    public void testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio() 
throws IOException {
+        TopicPartition tp0 = new TopicPartition("wishing-well", 0);
+        TopicPartition tp1 = new TopicPartition("wishing-well", 1);
+        TopicPartition tp2 = new TopicPartition("wishing-well", 2);
+        List<TopicPartition> partitions = List.of(tp0, tp1, tp2);
+
+        // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = 
setupIncreasinglyFilthyLogs(partitions);
+        LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+        partitions.forEach(partition -> 
cleanerManager.addCheckpoint(partition, 20L));
+
+        LogToClean filthiestLog = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+        assertEquals(tp2, filthiestLog.topicPartition());
+        assertEquals(tp2, filthiestLog.log().topicPartition());
+    }
+
+    @Test
+    public void testGrabFilthiestCompactedLogIgnoresUncleanablePartitions() 
throws IOException {
+        TopicPartition tp0 = new TopicPartition("wishing-well", 0);
+        TopicPartition tp1 = new TopicPartition("wishing-well", 1);
+        TopicPartition tp2 = new TopicPartition("wishing-well", 2);
+        List<TopicPartition> partitions = List.of(tp0, tp1, tp2);
+
+        // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = 
setupIncreasinglyFilthyLogs(partitions);
+        LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+        partitions.forEach(partition -> 
cleanerManager.addCheckpoint(partition, 20L));
+
+        
cleanerManager.markPartitionUncleanable(logs.get(tp2).dir().getParent(), tp2);
+
+        LogToClean filthiestLog = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+        assertEquals(tp1, filthiestLog.topicPartition());
+        assertEquals(tp1, filthiestLog.log().topicPartition());
+    }
+
+    @Test
+    public void testGrabFilthiestCompactedLogIgnoresInProgressPartitions() 
throws IOException {
+        TopicPartition tp0 = new TopicPartition("wishing-well", 0);
+        TopicPartition tp1 = new TopicPartition("wishing-well", 1);
+        TopicPartition tp2 = new TopicPartition("wishing-well", 2);
+        List<TopicPartition> partitions = List.of(tp0, tp1, tp2);
+
+        // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = 
setupIncreasinglyFilthyLogs(partitions);
+        LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+        partitions.forEach(partition -> 
cleanerManager.addCheckpoint(partition, 20L));
+
+        cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS);
+
+        LogToClean filthiestLog = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+        assertEquals(tp1, filthiestLog.topicPartition());
+        assertEquals(tp1, filthiestLog.log().topicPartition());
+    }
+
+    @Test
+    public void 
testGrabFilthiestCompactedLogIgnoresBothInProgressPartitionsAndUncleanablePartitions()
 throws IOException {
+        TopicPartition tp0 = new TopicPartition("wishing-well", 0);
+        TopicPartition tp1 = new TopicPartition("wishing-well", 1);
+        TopicPartition tp2 = new TopicPartition("wishing-well", 2);
+        List<TopicPartition> partitions = List.of(tp0, tp1, tp2);
+
+        // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = 
setupIncreasinglyFilthyLogs(partitions);
+        LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+        partitions.forEach(partition -> 
cleanerManager.addCheckpoint(partition, 20L));
+
+        cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS);
+        
cleanerManager.markPartitionUncleanable(logs.get(tp1).dir().getParent(), tp1);
+
+        Optional<LogToClean> filthiestLog = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+        assertEquals(Optional.empty(), filthiestLog);
+    }
+
+    @Test
+    public void testDirtyOffsetResetIfLargerThanEndOffset() throws IOException 
{
+        TopicPartition tp = new TopicPartition("foo", 0);
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = 
setupIncreasinglyFilthyLogs(List.of(tp));
+        LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+        cleanerManager.addCheckpoint(tp, 200L);
+
+        LogToClean filthiestLog = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+        assertEquals(0L, filthiestLog.firstDirtyOffset());
+    }
+
+    @Test
+    public void testDirtyOffsetResetIfSmallerThanStartOffset() throws 
IOException {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = 
setupIncreasinglyFilthyLogs(List.of(tp));
+
+        logs.get(tp).maybeIncrementLogStartOffset(10L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+
+        LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+        cleanerManager.addCheckpoint(tp, 0L);
+
+        LogToClean filthiestLog = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+        assertEquals(10L, filthiestLog.firstDirtyOffset());
+    }
+
+    @Test
+    public void testLogStartOffsetLargerThanActiveSegmentBaseOffset() throws 
IOException {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        UnifiedLog log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT, 
tp);
+
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = new 
ConcurrentHashMap<>();
+        logs.put(tp, log);
+
+        appendRecords(log, 3);
+        appendRecords(log, 3);
+        appendRecords(log, 3);
+
+        assertEquals(1, log.logSegments().size());
+
+        log.maybeIncrementLogStartOffset(2L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+
+        LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+        cleanerManager.addCheckpoint(tp, 0L);
+
+        // The active segment is uncleanable and hence not filthy from the POV 
of the CleanerManager.
+        Optional<LogToClean> filthiestLog = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+        assertEquals(Optional.empty(), filthiestLog);
+    }
+
+    @Test
+    public void testDirtyOffsetLargerThanActiveSegmentBaseOffset() throws 
IOException {
+        // It is possible in the case of an unclean leader election for the 
checkpoint
+        // dirty offset to get ahead of the active segment base offset, but 
still be
+        // within the range of the log.
+
+        TopicPartition tp = new TopicPartition("foo", 0);
+
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = new 
ConcurrentHashMap<>();
+        UnifiedLog log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT, 
tp);
+        logs.put(tp, log);
+
+        appendRecords(log, 3);
+        appendRecords(log, 3);
+
+        assertEquals(1, log.logSegments().size());
+        assertEquals(0L, log.activeSegment().baseOffset());
+
+        LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+        cleanerManager.addCheckpoint(tp, 3L);
+
+        // These segments are uncleanable and hence not filthy
+        Optional<LogToClean> filthiestLog = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+        assertEquals(Optional.empty(), filthiestLog);
+    }
+
+    /**
+     * When checking for logs with segments ready for deletion
+     * we shouldn't consider logs where cleanup.policy=delete
+     * as they are handled by the LogManager
+     */
+    @Test
+    public void 
testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs() throws 
IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), null);
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+
+        int readyToDelete = cleanerManager.deletableLogs().size();
+        assertEquals(0, readyToDelete, "should have 0 logs ready to be 
deleted");
+    }
+
+    /**
+     * We should find logs with segments ready to be deleted when 
cleanup.policy=compact,delete
+     */
+    @Test
+    public void 
testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactDeleteLogs() 
throws IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT + "," +
+            TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+
+        int readyToDelete = cleanerManager.deletableLogs().size();
+        assertEquals(1, readyToDelete, "should have 1 logs ready to be 
deleted");
+    }
+
+    /**
+     * When looking for logs with segments ready to be deleted we should 
consider
+     * logs with cleanup.policy=compact because they may have segments from 
before the log start offset
+     */
+    @Test
+    public void 
testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactLogs() throws 
IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+
+        int readyToDelete = cleanerManager.deletableLogs().size();
+        assertEquals(1, readyToDelete, "should have 1 logs ready to be 
deleted");
+    }
+
+    /**
+     * log under cleanup should be ineligible for compaction
+     */
+    @Test
+    public void testLogsUnderCleanupIneligibleForCompaction() throws 
IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+
+        log.appendAsLeader(records, 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.singletonRecords("test2".getBytes(), 
"test2".getBytes()), 0);
+        log.updateHighWatermark(2L);
+
+        // simulate cleanup thread working on the log partition
+        List<Map.Entry<TopicPartition, UnifiedLog>> deletableLog = 
cleanerManager.pauseCleaningForNonCompactedPartitions();
+        assertEquals(1, deletableLog.size(), "should have 1 logs ready to be 
deleted");
+
+        // change cleanup policy from delete to compact
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 
log.config().segmentSize());
+        logProps.put(TopicConfig.RETENTION_MS_CONFIG, 
log.config().retentionMs);
+        logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+        logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0);
+        LogConfig config = new LogConfig(logProps);
+        log.updateConfig(config);
+
+        // log cleanup in progress, the log is not available for compaction
+        Optional<LogToClean> cleanable = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+        assertTrue(cleanable.isEmpty(), "should have 0 logs ready to be 
compacted");
+
+        // log cleanup finished, and log can be picked up for compaction
+        
cleanerManager.resumeCleaning(deletableLog.stream().map(Map.Entry::getKey).collect(Collectors.toSet()));
+        Optional<LogToClean> cleanable2 = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+        assertTrue(cleanable2.isPresent(), "should have 1 logs ready to be 
compacted");
+
+        // update cleanup policy to delete
+        logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE);
+        LogConfig config2 = new LogConfig(logProps);
+        log.updateConfig(config2);
+
+        // compaction in progress, should have 0 log eligible for log cleanup
+        List<Map.Entry<TopicPartition, UnifiedLog>> deletableLog2 = 
cleanerManager.pauseCleaningForNonCompactedPartitions();
+        assertEquals(0, deletableLog2.size(), "should have 0 logs ready to be 
deleted");
+
+        // compaction done, should have 1 log eligible for log cleanup
+        
cleanerManager.doneDeleting(List.of(cleanable2.get().topicPartition()));
+        List<Map.Entry<TopicPartition, UnifiedLog>> deletableLog3 = 
cleanerManager.pauseCleaningForNonCompactedPartitions();
+        assertEquals(1, deletableLog3.size(), "should have 1 logs ready to be 
deleted");
+    }
+
+    @Test
+    public void testUpdateCheckpointsShouldAddOffsetToPartition() throws 
IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+
+        // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+        assertNotEquals(OFFSET, 
cleanerManager.allCleanerCheckpoints().getOrDefault(TOPIC_PARTITION, 0L));
+
+        cleanerManager.updateCheckpoints(logDir, 
Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty());
+        // expect the checkpoint offset is now updated to the expected offset 
after doing updateCheckpoints
+        assertEquals(OFFSET, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+    }
+
+    @Test
+    public void testUpdateCheckpointsShouldRemovePartitionData() throws 
IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+
+        // write some data into the cleaner-offset-checkpoint file
+        cleanerManager.updateCheckpoints(logDir, 
Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty());
+        assertEquals(OFFSET, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+        // updateCheckpoints should remove the topicPartition data in the 
logDir
+        cleanerManager.updateCheckpoints(logDir, Optional.empty(), 
Optional.of(TOPIC_PARTITION));
+        
assertFalse(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION));
+    }
+
+    @Test
+    public void testHandleLogDirFailureShouldRemoveDirAndData() throws 
IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+
+        // write some data into the cleaner-offset-checkpoint file in logDir 
and logDir2
+        cleanerManager.updateCheckpoints(logDir, 
Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty());
+        cleanerManager.updateCheckpoints(logDir2, 
Optional.of(Map.entry(TOPIC_PARTITION_2, OFFSET)), Optional.empty());
+        assertEquals(OFFSET, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+        assertEquals(OFFSET, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION_2));
+
+        cleanerManager.handleLogDirFailure(logDir.getAbsolutePath());
+        // verify the partition data in logDir is gone, and data in logDir2 is 
still there
+        assertEquals(OFFSET, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION_2));
+        
assertFalse(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION));
+    }
+
+    @Test
+    public void testMaybeTruncateCheckpointShouldTruncateData() throws 
IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+        long lowerOffset = 1L;
+        long higherOffset = 1000L;
+
+        // write some data into the cleaner-offset-checkpoint file in logDir
+        cleanerManager.updateCheckpoints(logDir, 
Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty());
+        assertEquals(OFFSET, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+        // we should not truncate the checkpoint data for checkpointed offset 
<= the given offset (higherOffset)
+        cleanerManager.maybeTruncateCheckpoint(logDir, TOPIC_PARTITION, 
higherOffset);
+        assertEquals(OFFSET, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+        // we should truncate the checkpoint data for checkpointed offset > 
the given offset (lowerOffset)
+        cleanerManager.maybeTruncateCheckpoint(logDir, TOPIC_PARTITION, 
lowerOffset);
+        assertEquals(lowerOffset, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+    }
+
+    @Test
+    public void testAlterCheckpointDirShouldRemoveDataInSrcDirAndAddInNewDir() 
throws IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+
+        // write some data into the cleaner-offset-checkpoint file in logDir
+        cleanerManager.updateCheckpoints(logDir, 
Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty());
+        assertEquals(OFFSET, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+        cleanerManager.alterCheckpointDir(TOPIC_PARTITION, logDir, logDir2);
+        // verify we still can get the partition offset after 
alterCheckpointDir
+        // This data should locate in logDir2, not logDir
+        assertEquals(OFFSET, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+        // force delete the logDir2 from checkpoints, so that the partition 
data should also be deleted
+        cleanerManager.handleLogDirFailure(logDir2.getAbsolutePath());
+        
assertFalse(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION));
+    }
+
+    /**
+     * Log under cleanup should still be eligible for log truncation.
+     */
+    @Test
+    public void testConcurrentLogCleanupAndLogTruncation() throws IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+
+        // log cleanup starts
+        List<Map.Entry<TopicPartition, UnifiedLog>> pausedPartitions = 
cleanerManager.pauseCleaningForNonCompactedPartitions();
+        // Log truncation happens due to unclean leader election
+        cleanerManager.abortAndPauseCleaning(log.topicPartition());
+        cleanerManager.resumeCleaning(Set.of(log.topicPartition()));
+        // log cleanup finishes and pausedPartitions are resumed
+        
cleanerManager.resumeCleaning(pausedPartitions.stream().map(Map.Entry::getKey).collect(Collectors.toSet()));
+
+        assertEquals(Optional.empty(), 
cleanerManager.cleaningState(log.topicPartition()));
+    }
+
+    /**
+     * Log under cleanup should still be eligible for topic deletion.
+     */
+    @Test
+    public void testConcurrentLogCleanupAndTopicDeletion() throws IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+
+        // log cleanup starts
+        List<Map.Entry<TopicPartition, UnifiedLog>> pausedPartitions = 
cleanerManager.pauseCleaningForNonCompactedPartitions();
+        // Broker processes StopReplicaRequest with delete=true
+        cleanerManager.abortCleaning(log.topicPartition());
+        // log cleanup finishes and pausedPartitions are resumed
+        
cleanerManager.resumeCleaning(pausedPartitions.stream().map(Map.Entry::getKey).collect(Collectors.toSet()));
+
+        assertEquals(Optional.empty(), 
cleanerManager.cleaningState(log.topicPartition()));
+    }
+
+    /**
+     * When looking for logs with segments ready to be deleted we shouldn't 
consider
+     * logs that have had their partition marked as uncleanable.
+     */
+    @Test
+    public void 
testLogsWithSegmentsToDeleteShouldNotConsiderUncleanablePartitions() throws 
IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+        cleanerManager.markPartitionUncleanable(log.dir().getParent(), 
TOPIC_PARTITION);
+
+        int readyToDelete = cleanerManager.deletableLogs().size();
+        assertEquals(0, readyToDelete, "should have 0 logs ready to be 
deleted");
+    }
+
+    /**
+     * Test computation of cleanable range with no minimum compaction lag 
settings active where bounded by LSO.
+     */
+    @Test
+    public void testCleanableOffsetsForNone() throws IOException {
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+        UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), 
logProps));
+
+        while (log.numberOfSegments() < 8)
+            log.appendAsLeader(records((int) log.logEndOffset(), (int) 
log.logEndOffset(), TIME.milliseconds()), 0);
+
+        log.updateHighWatermark(50);
+
+        Optional<Long> lastCleanOffset = Optional.of(0L);
+        OffsetsToClean cleanableOffsets = 
LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds());
+        assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first 
cleanable offset starts at the beginning of the log.");
+        assertEquals(log.highWatermark(), log.lastStableOffset(),
+            "The high watermark equals the last stable offset as no 
transactions are in progress");
+        assertEquals(log.lastStableOffset(), 
cleanableOffsets.firstUncleanableDirtyOffset(),
+            "The first uncleanable offset is bounded by the last stable 
offset.");
+    }
+
+    /**
+     * Test computation of cleanable range with no minimum compaction lag 
settings active where bounded by active segment.
+     */
+    @Test
+    public void testCleanableOffsetsActiveSegment() throws IOException {
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+        UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), 
logProps));
+
+        while (log.numberOfSegments() < 8)
+            log.appendAsLeader(records((int) log.logEndOffset(), (int) 
log.logEndOffset(), TIME.milliseconds()), 0);
+
+        log.updateHighWatermark(log.logEndOffset());
+
+        Optional<Long> lastCleanOffset = Optional.of(0L);
+        OffsetsToClean cleanableOffsets = 
LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds());
+
+        assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first 
cleanable offset starts at the beginning of the log.");
+        assertEquals(log.activeSegment().baseOffset(), 
cleanableOffsets.firstUncleanableDirtyOffset(),
+            "The first uncleanable offset begins with the active segment.");
+    }
+
+    /**
+     * Test computation of cleanable range with a minimum compaction lag time
+     */
+    @Test
+    public void testCleanableOffsetsForTime() throws IOException {
+        int compactionLag = 60 * 60 * 1000;
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+        logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag);
+        UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), 
logProps));
+
+        long t0 = TIME.milliseconds();
+        while (log.numberOfSegments() < 4)
+            log.appendAsLeader(records((int) log.logEndOffset(), (int) 
log.logEndOffset(), t0), 0);
+
+        LogSegment activeSegAtT0 = log.activeSegment();
+
+        TIME.sleep(compactionLag + 1);
+        long t1 = TIME.milliseconds();
+
+        while (log.numberOfSegments() < 8)
+            log.appendAsLeader(records((int) log.logEndOffset(), (int) 
log.logEndOffset(), t1), 0);
+
+        log.updateHighWatermark(log.logEndOffset());
+
+        Optional<Long> lastCleanOffset = Optional.of(0L);
+        OffsetsToClean cleanableOffsets = 
LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds());
+        assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first 
cleanable offset starts at the beginning of the log.");
+        assertEquals(activeSegAtT0.baseOffset(), 
cleanableOffsets.firstUncleanableDirtyOffset(),
+            "The first uncleanable offset begins with the second block of log 
entries.");
+    }
+
+    /**
+     * Test computation of cleanable range with a minimum compaction lag time 
that is small enough that
+     * the active segment contains it.
+     */
+    @Test
+    public void testCleanableOffsetsForShortTime() throws IOException {
+        int compactionLag = 60 * 60 * 1000;
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+        logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag);
+
+        UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), 
logProps));
+
+        long t0 = TIME.milliseconds();
+        while (log.numberOfSegments() < 8)
+            log.appendAsLeader(records((int) log.logEndOffset(), (int) 
log.logEndOffset(), t0), 0);
+
+        log.updateHighWatermark(log.logEndOffset());
+
+        TIME.sleep(compactionLag + 1);
+
+        Optional<Long> lastCleanOffset = Optional.of(0L);
+        OffsetsToClean cleanableOffsets = 
LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds());
+        assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first 
cleanable offset starts at the beginning of the log.");
+        assertEquals(log.activeSegment().baseOffset(), 
cleanableOffsets.firstUncleanableDirtyOffset(),
+            "The first uncleanable offset begins with active segment.");
+    }
+
+    @Test
+    public void testCleanableOffsetsNeedsCheckpointReset() throws IOException {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = 
setupIncreasinglyFilthyLogs(List.of(tp));
+        logs.get(tp).maybeIncrementLogStartOffset(10L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+
+        Optional<Long> lastCleanOffset = Optional.of(15L);
+        OffsetsToClean cleanableOffsets = 
LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, 
TIME.milliseconds());
+        assertFalse(cleanableOffsets.forceUpdateCheckpoint(), "Checkpoint 
offset should not be reset if valid");
+
+        logs.get(tp).maybeIncrementLogStartOffset(20L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), 
lastCleanOffset, TIME.milliseconds());
+        assertTrue(cleanableOffsets.forceUpdateCheckpoint(), "Checkpoint 
offset needs to be reset if less than log start offset");
+
+        lastCleanOffset = Optional.of(25L);
+        cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), 
lastCleanOffset, TIME.milliseconds());
+        assertTrue(cleanableOffsets.forceUpdateCheckpoint(), "Checkpoint 
offset needs to be reset if greater than log end offset");
+    }
+
+    @Test
+    public void testUndecidedTransactionalDataNotCleanable() throws 
IOException {
+        int compactionLag = 60 * 60 * 1000;
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+        logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag);
+        UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), 
logProps));
+
+        long producerId = 15L;
+        short producerEpoch = 0;
+        int sequence = 0;
+        
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
producerId, producerEpoch, sequence,
+            new SimpleRecord(TIME.milliseconds(), "1".getBytes(), 
"a".getBytes()),
+            new SimpleRecord(TIME.milliseconds(), "2".getBytes(), 
"b".getBytes())), 0);
+        
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
producerId, producerEpoch, sequence + 2,
+            new SimpleRecord(TIME.milliseconds(), "3".getBytes(), 
"c".getBytes())), 0);
+        log.roll();
+        log.updateHighWatermark(3L);
+
+        TIME.sleep(compactionLag + 1);
+        // although the compaction lag has been exceeded, the undecided data 
should not be cleaned
+        OffsetsToClean cleanableOffsets = 
LogCleanerManager.cleanableOffsets(log, Optional.of(0L), TIME.milliseconds());
+        assertEquals(0L, cleanableOffsets.firstDirtyOffset());
+        assertEquals(0L, cleanableOffsets.firstUncleanableDirtyOffset());
+
+        
log.appendAsLeader(MemoryRecords.withEndTransactionMarker(TIME.milliseconds(), 
producerId, producerEpoch,
+            new EndTransactionMarker(ControlRecordType.ABORT, 15)), 0, 
AppendOrigin.COORDINATOR, RequestLocal.noCaching(),
+            VerificationGuard.SENTINEL, 
TransactionVersion.TV_1.featureLevel());
+        log.roll();
+        log.updateHighWatermark(4L);
+
+        // the first segment should now become cleanable immediately
+        cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
Optional.of(0L), TIME.milliseconds());
+        assertEquals(0L, cleanableOffsets.firstDirtyOffset());
+        assertEquals(3L, cleanableOffsets.firstUncleanableDirtyOffset());
+
+        TIME.sleep(compactionLag + 1);
+
+        // the second segment becomes cleanable after the compaction lag
+        cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
Optional.of(0L), TIME.milliseconds());
+        assertEquals(0L, cleanableOffsets.firstDirtyOffset());
+        assertEquals(4L, cleanableOffsets.firstUncleanableDirtyOffset());
+    }
+
+    @Test
+    public void testDoneCleaning() throws IOException {
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+        UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), 
logProps));
+
+        while (log.numberOfSegments() < 8)
+            log.appendAsLeader(records((int) log.logEndOffset(), (int) 
log.logEndOffset(), TIME.milliseconds()), 0);
+
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+        assertThrows(IllegalStateException.class, () -> 
cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), 1));
+
+        cleanerManager.setCleaningState(TOPIC_PARTITION, 
LogCleaningState.logCleaningPaused(1));
+        assertThrows(IllegalStateException.class, () -> 
cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), 1));
+
+        cleanerManager.setCleaningState(TOPIC_PARTITION, 
LOG_CLEANING_IN_PROGRESS);
+        long endOffset = 1L;
+        cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), endOffset);
+
+        assertTrue(cleanerManager.cleaningState(TOPIC_PARTITION).isEmpty());
+        
assertTrue(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION));
+        assertEquals(endOffset, 
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+        cleanerManager.setCleaningState(TOPIC_PARTITION, LOG_CLEANING_ABORTED);
+        cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), endOffset);
+
+        assertEquals(LogCleaningState.logCleaningPaused(1), 
cleanerManager.cleaningState(TOPIC_PARTITION).get());
+        
assertTrue(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION));
+    }
+
+    @Test
+    public void testDoneDeleting() throws IOException {
+        MemoryRecords records = 
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+        UnifiedLog log = createLog(records.sizeInBytes() * 5, 
TopicConfig.CLEANUP_POLICY_COMPACT +
+            "," + TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 
0));
+        LogCleanerManager cleanerManager = createCleanerManager(log);
+        TopicPartition tp = new TopicPartition("log", 0);
+
+        assertThrows(IllegalStateException.class, () -> 
cleanerManager.doneDeleting(List.of(tp)));
+
+        cleanerManager.setCleaningState(tp, 
LogCleaningState.logCleaningPaused(1));
+        assertThrows(IllegalStateException.class, () -> 
cleanerManager.doneDeleting(List.of(tp)));
+
+        cleanerManager.setCleaningState(tp, LOG_CLEANING_IN_PROGRESS);
+        cleanerManager.doneDeleting(List.of(tp));
+        assertTrue(cleanerManager.cleaningState(tp).isEmpty());
+
+        cleanerManager.setCleaningState(tp, LOG_CLEANING_ABORTED);
+        cleanerManager.doneDeleting(List.of(tp));
+        assertEquals(LogCleaningState.logCleaningPaused(1), 
cleanerManager.cleaningState(tp).get());
+    }
+
+    /**
+     * Logs with invalid checkpoint offsets should update their checkpoint 
offset even if the log doesn't need cleaning.
+     */
+    @Test
+    public void testCheckpointUpdatedForInvalidOffsetNoCleaning() throws 
IOException {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = 
setupIncreasinglyFilthyLogs(List.of(tp));
+
+        logs.get(tp).maybeIncrementLogStartOffset(20L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+        cleanerManager.addCheckpoint(tp, 15L);
+
+        Optional<LogToClean> filthiestLog = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+        assertEquals(Optional.empty(), filthiestLog, "Log should not be 
selected for cleaning");
+        assertEquals(20L, cleanerManager.checkpointOffset(tp), "Unselected log 
should have checkpoint offset updated");
+    }
+
+    /**
+     * Logs with invalid checkpoint offsets should update their checkpoint 
offset even if they aren't selected
+     * for immediate cleaning.
+     */
+    @Test
+    public void testCheckpointUpdatedForInvalidOffsetNotSelected() throws 
IOException {
+        TopicPartition tp0 = new TopicPartition("foo", 0);
+        TopicPartition tp1 = new TopicPartition("foo", 1);
+        List<TopicPartition> partitions = List.of(tp0, tp1);
+
+        // create two logs, one with an invalid offset, and one that is 
dirtier than the log with an invalid offset
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = 
setupIncreasinglyFilthyLogs(partitions);
+        logs.get(tp0).maybeIncrementLogStartOffset(15L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+        cleanerManager.addCheckpoint(tp0, 10L);
+        cleanerManager.addCheckpoint(tp1, 5L);
+
+        LogToClean filthiestLog = 
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+        assertEquals(tp1, filthiestLog.topicPartition(), "Dirtier log should 
be selected");
+        assertEquals(15L, cleanerManager.checkpointOffset(tp0), "Unselected 
log should have checkpoint offset updated");
+    }
+
+    private LogCleanerManager createCleanerManager(UnifiedLog log) {
+        ConcurrentMap<TopicPartition, UnifiedLog> logs = new 
ConcurrentHashMap<>();
+        logs.put(TOPIC_PARTITION, log);
+
+        return new LogCleanerManager(List.of(logDir, logDir2), logs, null);
+    }
+
+    private LogCleanerManagerMock 
createCleanerManagerMock(ConcurrentMap<TopicPartition, UnifiedLog> pool) {
+        return new LogCleanerManagerMock(List.of(logDir), pool, null);
+    }
+
+    private UnifiedLog createLog(int segmentSize, String cleanupPolicy, 
TopicPartition topicPartition) throws IOException {
+        LogConfig config = createLowRetentionLogConfig(segmentSize, 
cleanupPolicy);
+        File partitionDir = new File(logDir, 
UnifiedLog.logDirName(topicPartition));
+
+        return UnifiedLog.create(partitionDir, config, 0L, 0L, TIME.scheduler, 
new BrokerTopicStats(), TIME, 5 * 60 * 1000,
+            PRODUCER_STATE_MANAGER_CONFIG, 
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, new LogDirFailureChannel(10),
+            true, Optional.empty());
+    }
+
+    private LogConfig createLowRetentionLogConfig(int segmentSize, String 
cleanupPolicy) {
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize);
+        logProps.put(TopicConfig.RETENTION_MS_CONFIG, 1);
+        logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy);
+        logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.05); // 
small for easier and clearer tests
+
+        return new LogConfig(logProps);
+    }
+
+    private void writeRecords(UnifiedLog log, int numBatches, int 
recordsPerBatch, int batchesPerSegment) throws IOException {
+        for (int i = 0; i < numBatches; i++) {
+            appendRecords(log, recordsPerBatch);
+            if (i % batchesPerSegment == 0)
+                log.roll();
+        }
+        log.roll();
+    }
+
+    private void appendRecords(UnifiedLog log, int numRecords) throws 
IOException {
+        long startOffset = log.logEndOffset();
+        long endOffset = startOffset + numRecords;
+
+        SimpleRecord[] records = IntStream.range((int) startOffset, (int) 
endOffset)
+            .mapToObj(offset -> new SimpleRecord(TIME.milliseconds(), 
String.format("key-%d", offset).getBytes(),
+                String.format("value-%d", offset).getBytes()))
+            .toArray(SimpleRecord[]::new);
+
+        log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 
records), 1);
+        log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+    }
+
+    private UnifiedLog makeLog(LogConfig config) throws IOException {
+        return UnifiedLog.create(logDir, config, 0L, 0L, TIME.scheduler, new 
BrokerTopicStats(), TIME, 5 * 60 * 1000,
+            PRODUCER_STATE_MANAGER_CONFIG, 
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, new LogDirFailureChannel(10),
+            true, Optional.empty());
+    }
+
+    private MemoryRecords records(int key, int value, long timestamp) {
+        return MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord(timestamp, Integer.toString(key).getBytes(),
+            Integer.toString(value).getBytes()));
+    }
+
+    private static LogConfig createLogConfig() {
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+        logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024);
+        logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+
+        return new LogConfig(logProps);
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index 6a89b82043d..21643c9186f 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -16,17 +16,25 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.EndTransactionMarker;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.common.RequestLocal;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class LogTestUtils {
@@ -73,6 +81,39 @@ public class LogTestUtils {
         return MemoryRecords.withEndTransactionMarker(offset, timestamp, 
partitionLeaderEpoch, producerId, epoch, marker);
     }
 
+    /**
+     * Wrap a single record log buffer.
+     */
+    public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+        return records(
+            List.of(new SimpleRecord(RecordBatch.NO_TIMESTAMP, key, value)),
+            RecordBatch.CURRENT_MAGIC_VALUE,
+            Compression.NONE,
+            RecordBatch.NO_PRODUCER_ID,
+            RecordBatch.NO_PRODUCER_EPOCH,
+            RecordBatch.NO_SEQUENCE,
+            0L,
+            RecordBatch.NO_PARTITION_LEADER_EPOCH
+        );
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression codec,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        ByteBuffer buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
codec, TimestampType.CREATE_TIME, baseOffset,
+            System.currentTimeMillis(), producerId, producerEpoch, sequence, 
false, partitionLeaderEpoch);
+
+        records.forEach(builder::append);
+
+        return builder.build();
+    }
+
     public static class LogConfigBuilder {
         private final Map<String, Object> configs = new HashMap<>();
 

Reply via email to