This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 258ed5a  KAFKA-12155: Metadata log and snapshot cleaning #10864
258ed5a is described below

commit 258ed5a744d7a1d6a60c21844d0383dc7ca9ac8c
Author: David Arthur <[email protected]>
AuthorDate: Mon Jun 7 16:25:28 2021 -0400

    KAFKA-12155: Metadata log and snapshot cleaning #10864
    
    This PR includes changes to KafkaRaftClient and KafkaMetadataLog to support 
periodic
    cleaning of old log segments and snapshots.
    
    Four new public config keys are introduced: metadata.log.segment.bytes,
    metadata.log.segment.ms, metadata.max.retention.bytes, and
    metadata.max.retention.ms.
    
    These are used to configure the log layer as well as the snapshot cleaning 
logic. Snapshot
    and log cleaning is performed based on two criteria: total metadata log + 
snapshot size
    (metadata.max.retention.bytes), and max age of a snapshot 
(metadata.max.retention.ms).
    Since we have a requirement that the log start offset must always align 
with a snapshot,
    we perform the cleaning on snapshots first and then clean what logs we can.
    
    The cleaning algorithm follows:
    1. Delete the oldest snapshot.
    2. Advance the log start offset to the new oldest snapshot.
    3. Request that the log layer clean any segments prior to the new log start 
offset
    4. Repeat this until the retention size or time is no longer violated, or 
only a single
    snapshot remains.
    
    The cleaning process is triggered every 60 seconds from the KafkaRaftClient 
polling
    thread.
    
    Reviewers: José Armando García Sancio <[email protected]>, dengziming 
<[email protected]>, Colin P. McCabe <[email protected]>
---
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   | 227 ++++++++++++---
 core/src/main/scala/kafka/raft/RaftManager.scala   |   3 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  25 +-
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    | 321 ++++++++++++---------
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   4 +
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  44 ++-
 .../java/org/apache/kafka/raft/ReplicatedLog.java  |   9 +-
 .../java/org/apache/kafka/snapshot/Snapshots.java  |   8 +-
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    |   9 +-
 .../test/java/org/apache/kafka/raft/MockLog.java   |   5 +
 10 files changed, 470 insertions(+), 185 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala 
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 2a9e7a1..df8e1af 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -16,34 +16,37 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
-import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, 
SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
LogDirFailureChannel, RequestLocal}
+import kafka.log.{AppendOrigin, Defaults, Log, LogConfig, LogOffsetSnapshot, 
SnapshotGenerated}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
-import org.apache.kafka.common.record.{MemoryRecords, Records}
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.config.AbstractConfig
+import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, 
Records}
+import org.apache.kafka.common.utils.{BufferSupplier, Time}
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, 
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, 
ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, 
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, 
Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the 
snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to 
store any opened snapshot reader.
   snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
   topicPartition: TopicPartition,
-  maxFetchSizeInBytes: Int,
-  val fileDeleteDelayMs: Long // Visible for testing,
+  config: MetadataLogConfig
 ) extends ReplicatedLog with Logging {
 
+  this.logIdent = s"[MetadataLog partition=$topicPartition, 
nodeId=${config.nodeId}] "
+
   override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo 
= {
     val isolation = readIsolation match {
       case Isolation.COMMITTED => FetchHighWatermark
@@ -52,7 +55,7 @@ final class KafkaMetadataLog private (
     }
 
     val fetchInfo = log.read(startOffset,
-      maxLength = maxFetchSizeInBytes,
+      maxLength = config.maxFetchSizeInBytes,
       isolation = isolation,
       minOneMessage = true)
 
@@ -91,10 +94,6 @@ final class KafkaMetadataLog private (
   private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
     appendInfo.firstOffset match {
       case Some(firstOffset) =>
-        if (firstOffset.relativePositionInSegment == 0) {
-          // Assume that a new segment was created if the relative position is 0
-          log.deleteOldSegments()
-        }
         new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
       case None =>
         throw new KafkaException(s"Append failed unexpectedly: 
${appendInfo.errorMessage}")
@@ -312,29 +311,155 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
+  /**
+   * Delete snapshots that come before a given snapshot ID. This is done by 
advancing the log start offset to the given
+   * snapshot and cleaning old log segments.
+   *
+   * This will only happen if the following invariants all hold true:
+   *
+   * <li>The given snapshot precedes the latest snapshot</li>
+   * <li>The offset of the given snapshot is greater than the log start 
offset</li>
+   * <li>The log layer can advance the offset to the given snapshot</li>
+   *
+   * This method is thread-safe
+   */
+  override def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch): Boolean = {
     val (deleted, forgottenSnapshots) = snapshots synchronized {
       latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-
-          // Delete all segments that have a "last offset" less than the log 
start offset
-          log.deleteOldSegments()
-
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case Some(latestSnapshotId) if
+          snapshots.contains(snapshotId) &&
+          startOffset < snapshotId.offset &&
+          snapshotId.offset <= latestSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(snapshotId.offset, 
SnapshotGenerated) =>
+            // Delete all segments that have a "last offset" less than the log 
start offset
+            log.deleteOldSegments()
+            // Remove older snapshots from the snapshots cache
+            (true, forgetSnapshotsBefore(snapshotId))
         case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+            (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
       }
     }
-
     removeSnapshots(forgottenSnapshots)
     deleted
   }
 
   /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => 
(snapshotId, reader.sizeInBytes())}
+    }
+  }
+
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the 
snapshot exists and has records
+   */
+  private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val batchIterator = reader.records().batchIterator()
+
+      val firstBatch = batchIterator.next()
+      val records = firstBatch.streamingIterator(new 
BufferSupplier.GrowableBufferSupplier())
+      if (firstBatch.isControlBatch) {
+        val header = 
ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next());
+        Some(header.lastContainedLogTimestamp())
+      } else {
+        warn("Did not find control record at beginning of snapshot")
+        None
+      }
+    }
+  }
+
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning 
as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, 
or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      var didClean = false
+      didClean |= cleanSnapshotsRetentionSize()
+      didClean |= cleanSnapshotsRetentionMs()
+      didClean
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we 
should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments 
we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch) => Boolean): Boolean 
= {
+    if (snapshots.size < 2)
+      return false
+
+    var didClean = false
+    snapshots.keys.toSeq.sliding(2).toSeq.takeWhile {
+      case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
+        if (predicate(snapshot) && deleteBeforeSnapshot(nextSnapshot)) {
+          didClean = true
+          true
+        } else {
+          false
+        }
+      case _ => false // Shouldn't get here with the sliding window
+    }
+    didClean
+  }
+
+  private def cleanSnapshotsRetentionMs(): Boolean = {
+    if (config.retentionMillis < 0)
+      return false
+
+    // Keep deleting snapshots as long as the
+    def shouldClean(snapshotId: OffsetAndEpoch): Boolean = {
+      val now = time.milliseconds()
+      readSnapshotTimestamp(snapshotId).exists { timestamp =>
+        if (now - timestamp > config.retentionMillis) {
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    cleanSnapshots(shouldClean)
+  }
+
+  private def cleanSnapshotsRetentionSize(): Boolean = {
+    if (config.retentionMaxBytes < 0)
+      return false
+
+    val snapshotSizes = loadSnapshotSizes().toMap
+
+    var snapshotTotalSize: Long = snapshotSizes.values.sum
+
+    // Keep deleting snapshots and segments as long as we exceed the retention 
size
+    def shouldClean(snapshotId: OffsetAndEpoch): Boolean = {
+      snapshotSizes.get(snapshotId).exists { snapshotSize =>
+        if (log.size + snapshotTotalSize > config.retentionMaxBytes) {
+          snapshotTotalSize -= snapshotSize
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    cleanSnapshots(shouldClean)
+  }
+
+  /**
    * Forget the snapshots earlier than a given snapshot id and return the 
associated
    * snapshot readers.
    *
@@ -358,6 +483,7 @@ final class KafkaMetadataLog private (
     expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
   ): Unit = {
     expiredSnapshots.foreach { case (snapshotId, _) =>
+      info(s"Marking snapshot $snapshotId for deletion")
       Snapshots.markForDelete(log.dir.toPath, snapshotId)
     }
 
@@ -365,7 +491,7 @@ final class KafkaMetadataLog private (
       scheduler.schedule(
         "delete-snapshot-files",
         KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots, 
this),
-        fileDeleteDelayMs
+        config.fileDeleteDelayMs
       )
     }
   }
@@ -377,22 +503,53 @@ final class KafkaMetadataLog private (
       snapshots.clear()
     }
   }
+
+  private[raft] def snapshotCount(): Int = {
+    snapshots synchronized {
+      snapshots.size
+    }
+  }
 }
 
-object KafkaMetadataLog {
+object MetadataLogConfig {
+  def apply(config: AbstractConfig, maxBatchSizeInBytes: Int, 
maxFetchSizeInBytes: Int): MetadataLogConfig = {
+    new MetadataLogConfig(
+      config.getInt(KafkaConfig.MetadataLogSegmentBytesProp),
+      config.getLong(KafkaConfig.MetadataLogSegmentMillisProp),
+      config.getLong(KafkaConfig.MetadataMaxRetentionBytesProp),
+      config.getLong(KafkaConfig.MetadataMaxRetentionMillisProp),
+      maxBatchSizeInBytes,
+      maxFetchSizeInBytes,
+      Defaults.FileDeleteDelayMs,
+      config.getInt(KafkaConfig.NodeIdProp)
+    )
+  }
+}
+
+case class MetadataLogConfig(logSegmentBytes: Int,
+                             logSegmentMillis: Long,
+                             retentionMaxBytes: Long,
+                             retentionMillis: Long,
+                             maxBatchSizeInBytes: Int,
+                             maxFetchSizeInBytes: Int,
+                             fileDeleteDelayMs: Int,
+                             nodeId: Int)
 
+object KafkaMetadataLog {
   def apply(
     topicPartition: TopicPartition,
     topicId: Uuid,
     dataDir: File,
     time: Time,
     scheduler: Scheduler,
-    maxBatchSizeInBytes: Int,
-    maxFetchSizeInBytes: Int
+    config: MetadataLogConfig
   ): KafkaMetadataLog = {
     val props = new Properties()
-    props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+    props.put(LogConfig.MaxMessageBytesProp, 
config.maxBatchSizeInBytes.toString)
     props.put(LogConfig.MessageFormatVersionProp, 
ApiVersion.latestVersion.toString)
+    props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
+    props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
+    props.put(LogConfig.FileDeleteDelayMsProp, 
Int.box(Defaults.FileDeleteDelayMs))
 
     LogConfig.validateValues(props)
     val defaultLogConfig = LogConfig(props)
@@ -415,11 +572,11 @@ object KafkaMetadataLog {
 
     val metadataLog = new KafkaMetadataLog(
       log,
+      time,
       scheduler,
       recoverSnapshots(log),
       topicPartition,
-      maxFetchSizeInBytes,
-      defaultLogConfig.fileDeleteDelayMs
+      config
     )
 
     // When recovering, truncate fully if the latest snapshot is after the log 
end offset. This can happen to a follower
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index a1dcf64..1ab63b8 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -254,8 +254,7 @@ class KafkaRaftManager[T](
       dataDir,
       time,
       scheduler,
-      maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
-      maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+      config = MetadataLogConfig(config, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, 
KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
     )
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fa8ddf0..6a3c97b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -378,6 +378,10 @@ object KafkaConfig {
   val MetadataSnapshotMaxNewRecordBytesProp = 
"metadata.log.max.record.bytes.between.snapshots"
   val ControllerListenerNamesProp = "controller.listener.names"
   val SaslMechanismControllerProtocolProp = 
"sasl.mechanism.controller.protocol"
+  val MetadataLogSegmentBytesProp = "metadata.log.segment.bytes"
+  val MetadataLogSegmentMillisProp = "metadata.log.segment.ms"
+  val MetadataMaxRetentionBytesProp = "metadata.max.retention.bytes"
+  val MetadataMaxRetentionMillisProp = "metadata.max.retention.ms"
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameProp = "authorizer.class.name"
@@ -670,6 +674,12 @@ object KafkaConfig {
   val ControllerListenerNamesDoc = "A comma-separated list of the names of the 
listeners used by the controller. This is required " +
     "if running in KRaft mode. The ZK-based controller will not use this 
configuration."
   val SaslMechanismControllerProtocolDoc = "SASL mechanism used for 
communication with controllers. Default is GSSAPI."
+  val MetadataLogSegmentBytesDoc = "The maximum size of a single metadata log 
file."
+  val MetadataLogSegmentMillisDoc = "The maximum time before a new metadata 
log file is rolled out (in milliseconds)."
+  val MetadataMaxRetentionBytesDoc = "The maximum combined size of the 
metadata log and snapshots before deleting old " +
+    "snapshots and log files. Since at least one snapshot must exist before 
any logs can be deleted, this is a soft limit."
+  val MetadataMaxRetentionMillisDoc = "The number of milliseconds to keep a 
metadata log file or snapshot before " +
+    "deleting it. Since at least one snapshot must exist before any logs can 
be deleted, this is a soft limit."
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = s"The fully qualified name of a class that 
implements s${classOf[Authorizer].getName}" +
@@ -1048,14 +1058,21 @@ object KafkaConfig {
        */
       .define(MetadataSnapshotMaxNewRecordBytesProp, LONG, 
Defaults.MetadataSnapshotMaxNewRecordBytes, atLeast(1), HIGH, 
MetadataSnapshotMaxNewRecordBytesDoc)
 
+      /*
+       * KRaft mode private configs. Note that these configs are defined as 
internal. We will make them public in the 3.0.0 release.
+       */
       .define(ProcessRolesProp, LIST, Collections.emptyList(), 
ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
       .define(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, NodeIdDoc)
       .define(InitialBrokerRegistrationTimeoutMsProp, INT, 
Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, 
InitialBrokerRegistrationTimeoutMsDoc)
       .define(BrokerHeartbeatIntervalMsProp, INT, 
Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
       .define(BrokerSessionTimeoutMsProp, INT, 
Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
-      .define(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
       .define(ControllerListenerNamesProp, STRING, null, null, HIGH, 
ControllerListenerNamesDoc)
       .define(SaslMechanismControllerProtocolProp, STRING, 
SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, 
SaslMechanismControllerProtocolDoc)
+      .define(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
+      .define(MetadataLogSegmentBytesProp, INT, Defaults.LogSegmentBytes, 
atLeast(Records.LOG_OVERHEAD), HIGH, MetadataLogSegmentBytesDoc)
+      .define(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 * 
60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
+      .define(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes, 
null, HIGH, MetadataMaxRetentionBytesDoc)
+      .define(MetadataMaxRetentionMillisProp, LONG, Defaults.LogRetentionHours 
* 60 * 60 * 1000L, null, HIGH, MetadataMaxRetentionMillisDoc)
 
       /************* Authorizer Configuration ***********/
       .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, 
LOW, AuthorizerClassNameDoc)
@@ -1523,6 +1540,12 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
     }
   }
 
+  def metadataLogSegmentBytes = getInt(KafkaConfig.MetadataLogSegmentBytesProp)
+  def metadataLogSegmentMillis = 
getLong(KafkaConfig.MetadataLogSegmentMillisProp)
+  def metadataRetentionBytes = 
getLong(KafkaConfig.MetadataMaxRetentionBytesProp)
+  def metadataRetentionMillis = 
getLong(KafkaConfig.MetadataMaxRetentionMillisProp)
+
+
   def numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
   def backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
   val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala 
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 1d0c8d8..fb477f3 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -16,25 +16,27 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.ByteBuffer
-import java.nio.file.{Files, Path}
-import java.util.{Collections, Optional}
-import kafka.log.Log
+import kafka.log.{Defaults, Log, SegmentDeletion}
 import kafka.server.KafkaRaftServer
 import kafka.utils.{MockTime, TestUtils}
-import org.apache.kafka.common.errors.{OffsetOutOfRangeException, 
RecordTooLargeException}
+import org.apache.kafka.common.errors.RecordTooLargeException
 import org.apache.kafka.common.protocol
 import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
SimpleRecord}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.raft.internals.BatchBuilder
-import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, 
LogOffsetMetadata, OffsetAndEpoch, ReplicatedLog, ValidOffsetAndEpoch}
+import org.apache.kafka.raft._
 import org.apache.kafka.server.common.serialization.RecordSerde
-import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertThrows, assertTrue}
+import org.apache.kafka.snapshot.{RawSnapshotReader, RawSnapshotWriter, 
SnapshotPath, Snapshots}
+import org.apache.kafka.test.TestUtils.assertOptional
+import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
+import java.io.File
+import java.nio.ByteBuffer
+import java.nio.file.{Files, Path}
+import java.util.{Collections, Optional}
+
 final class KafkaMetadataLogTest {
   import KafkaMetadataLogTest._
 
@@ -158,27 +160,42 @@ final class KafkaMetadataLogTest {
   def testCreateSnapshotBeforeLogStartOffset(): Unit = {
     val numberOfRecords = 10
     val epoch = 1
-    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    val snapshotId = new OffsetAndEpoch(numberOfRecords-4, epoch)
     val log = buildMetadataLog(tempDir, mockTime)
 
     append(log, numberOfRecords, epoch)
     log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
     TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
       snapshot.freeze()
     }
 
-    assertTrue(log.deleteBeforeSnapshot(snapshotId))
-    assertEquals(snapshotId.offset, log.startOffset)
+    // Simulate log cleanup that advances the LSO
+    log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, 
SegmentDeletion)
 
     assertThrows(
       classOf[IllegalArgumentException],
-      () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 1, 
snapshotId.epoch))
+      () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2, 
snapshotId.epoch))
     )
   }
 
   @Test
-  def testCreateSnapshotMuchEalierEpoch(): Unit = {
+  def testCreateSnapshotDivergingEpoch(): Unit = {
+    val numberOfRecords = 10
+    val epoch = 2
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    assertThrows(
+      classOf[IllegalArgumentException],
+      () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset, 
snapshotId.epoch - 1))
+    )
+  }
+
+  @Test
+  def testCreateSnapshotOlderEpoch(): Unit = {
     val numberOfRecords = 10
     val epoch = 2
     val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
@@ -191,9 +208,6 @@ final class KafkaMetadataLogTest {
       snapshot.freeze()
     }
 
-    assertTrue(log.deleteBeforeSnapshot(snapshotId))
-    assertEquals(snapshotId.offset, log.startOffset)
-
     assertThrows(
       classOf[IllegalArgumentException],
       () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset, 
snapshotId.epoch - 1))
@@ -243,9 +257,8 @@ final class KafkaMetadataLogTest {
       snapshot.freeze()
     }
 
-    assertTrue(log.deleteBeforeSnapshot(snapshotId))
-    assertEquals(snapshotId.offset, log.startOffset)
-    assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId))
+    assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId),
+      "Creating an existing snapshot should not do anything")
   }
 
   @Test
@@ -263,71 +276,7 @@ final class KafkaMetadataLogTest {
   }
 
   @Test
-  def testUpdateLogStartOffset(): Unit = {
-    val log = buildMetadataLog(tempDir, mockTime)
-    val offset = 10
-    val epoch = 0
-    val snapshotId = new OffsetAndEpoch(offset, epoch)
-
-    append(log, offset, epoch)
-    log.updateHighWatermark(new LogOffsetMetadata(offset))
-
-    TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
-
-    assertTrue(log.deleteBeforeSnapshot(snapshotId))
-    assertEquals(offset, log.startOffset)
-    assertEquals(epoch, log.lastFetchedEpoch)
-    assertEquals(offset, log.endOffset().offset)
-    assertEquals(offset, log.highWatermark.offset)
-
-    val newRecords = 10
-    append(log, newRecords, epoch + 1)
-    // Start offset should not change since a new snapshot was not generated
-    assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(offset + 
newRecords, epoch)))
-    assertEquals(offset, log.startOffset)
-
-    assertEquals(epoch + 1, log.lastFetchedEpoch)
-    assertEquals(offset + newRecords, log.endOffset().offset)
-    assertEquals(offset, log.highWatermark.offset)
-  }
-
-  @Test
-  def testUpdateLogStartOffsetWillRemoveOlderSnapshot(): Unit = {
-    val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
-    val offset = 10
-    val epoch = 0
-
-    append(log, offset, epoch)
-    val oldSnapshotId = new OffsetAndEpoch(offset, epoch)
-    TestUtils.resource(log.storeSnapshot(oldSnapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
-
-    append(log, offset, epoch)
-    val newSnapshotId = new OffsetAndEpoch(offset * 2, epoch)
-    TestUtils.resource(log.storeSnapshot(newSnapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
-
-    log.updateHighWatermark(new LogOffsetMetadata(offset * 2))
-    assertTrue(log.deleteBeforeSnapshot(newSnapshotId))
-    log.close()
-
-    mockTime.sleep(log.fileDeleteDelayMs)
-    // Assert that the log dir doesn't contain any older snapshots
-    Files
-      .walk(logDir, 1)
-      .map[Optional[SnapshotPath]](Snapshots.parse)
-      .filter(_.isPresent)
-      .forEach { path =>
-        assertFalse(path.get.snapshotId.offset < log.startOffset)
-      }
-  }
-
-  @Test
-  def testUpdateLogStartOffsetWithMissingSnapshot(): Unit = {
+  def testDeleteNonExistentSnapshot(): Unit = {
     val log = buildMetadataLog(tempDir, mockTime)
     val offset = 10
     val epoch = 0
@@ -335,7 +284,7 @@ final class KafkaMetadataLogTest {
     append(log, offset, epoch)
     log.updateHighWatermark(new LogOffsetMetadata(offset))
 
-    assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(1L, epoch)))
+    assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(2L, epoch)))
     assertEquals(0, log.startOffset)
     assertEquals(epoch, log.lastFetchedEpoch)
     assertEquals(offset, log.endOffset().offset)
@@ -343,26 +292,6 @@ final class KafkaMetadataLogTest {
   }
 
   @Test
-  def testFailToIncreaseLogStartPastHighWatermark(): Unit = {
-    val log = buildMetadataLog(tempDir, mockTime)
-    val offset = 10
-    val epoch = 0
-    val snapshotId = new OffsetAndEpoch(2 * offset, 1 + epoch)
-
-    append(log, offset, epoch)
-    log.updateHighWatermark(new LogOffsetMetadata(offset))
-
-    TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
-
-    assertThrows(
-      classOf[OffsetOutOfRangeException],
-      () => log.deleteBeforeSnapshot(snapshotId)
-    )
-  }
-
-  @Test
   def testTruncateFullyToLatestSnapshot(): Unit = {
     val log = buildMetadataLog(tempDir, mockTime)
     val numberOfRecords = 10
@@ -398,8 +327,7 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testTruncateWillRemoveOlderSnapshot(): Unit = {
-
-    val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
+    val (logDir, log, config) = buildMetadataLogAndDir(tempDir, mockTime)
     val numberOfRecords = 10
     val epoch = 1
 
@@ -432,7 +360,7 @@ final class KafkaMetadataLogTest {
     assertEquals(log.earliestSnapshotId(), log.latestSnapshotId())
     log.close()
 
-    mockTime.sleep(log.fileDeleteDelayMs)
+    mockTime.sleep(config.fileDeleteDelayMs)
     // Assert that the log dir doesn't contain any older snapshots
     Files
       .walk(logDir, 1)
@@ -470,7 +398,7 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testCleanupPartialSnapshots(): Unit = {
-    val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
+    val (logDir, log, config) = buildMetadataLogAndDir(tempDir, mockTime)
     val numberOfRecords = 10
     val epoch = 1
     val snapshotId = new OffsetAndEpoch(1, epoch)
@@ -507,7 +435,7 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testCleanupOlderSnapshots(): Unit = {
-    val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
+    val (logDir, log, config) = buildMetadataLogAndDir(tempDir, mockTime)
     val numberOfRecords = 10
     val epoch = 1
 
@@ -542,7 +470,7 @@ final class KafkaMetadataLogTest {
     assertEquals(greaterSnapshotId, secondLog.latestSnapshotId().get)
     assertEquals(3 * numberOfRecords, secondLog.startOffset)
     assertEquals(epoch, secondLog.lastFetchedEpoch)
-    mockTime.sleep(log.fileDeleteDelayMs)
+    mockTime.sleep(config.fileDeleteDelayMs)
 
     // Assert that the log dir doesn't contain any older snapshots
     Files
@@ -582,7 +510,7 @@ final class KafkaMetadataLogTest {
     val leaderEpoch = 5
     val maxBatchSizeInBytes = 16384
     val recordSize = 64
-    val log = buildMetadataLog(tempDir, mockTime, maxBatchSizeInBytes)
+    val log = buildMetadataLog(tempDir, mockTime, 
DefaultMetadataLogConfig.copy(maxBatchSizeInBytes = maxBatchSizeInBytes))
 
     val oversizeBatch = buildFullBatch(leaderEpoch, recordSize, 
maxBatchSizeInBytes + recordSize)
     assertThrows(classOf[RecordTooLargeException], () => {
@@ -661,10 +589,9 @@ final class KafkaMetadataLogTest {
     log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
 
     val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
-    TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot =>
+    TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
       snapshot.freeze()
     }
-    assertTrue(log.deleteBeforeSnapshot(snapshotId))
 
     val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
     assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
@@ -682,10 +609,11 @@ final class KafkaMetadataLogTest {
     log.updateHighWatermark(new LogOffsetMetadata(offset))
 
     val snapshotId = new OffsetAndEpoch(offset, epoch)
-    TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot =>
+    TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
       snapshot.freeze()
     }
-    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+    // Simulate log cleaning advancing the LSO
+    log.log.maybeIncrementLogStartOffset(offset, SegmentDeletion);
 
     val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
     assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
@@ -703,10 +631,9 @@ final class KafkaMetadataLogTest {
     log.updateHighWatermark(new LogOffsetMetadata(offset))
 
     val snapshotId = new OffsetAndEpoch(offset, epoch)
-    TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot =>
+    TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
       snapshot.freeze()
     }
-    assertTrue(log.deleteBeforeSnapshot(snapshotId))
 
     val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
     assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
@@ -800,6 +727,125 @@ final class KafkaMetadataLogTest {
     assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
     assertEquals(new OffsetAndEpoch(numberOfRecords - 1, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
   }
+
+  @Test
+  def testAdvanceLogStartOffsetAfterCleaning(): Unit = {
+    val config = MetadataLogConfig(
+      logSegmentBytes = 512,
+      logSegmentMillis = 10 * 1000,
+      retentionMaxBytes = 256,
+      retentionMillis = 60 * 1000,
+      maxBatchSizeInBytes = 512,
+      maxFetchSizeInBytes = DefaultMetadataLogConfig.maxFetchSizeInBytes,
+      fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+      nodeId = 1
+    )
+    config.copy()
+    val log = buildMetadataLog(tempDir, mockTime, config)
+
+    // Generate some segments
+    for(_ <- 0 to 100) {
+      append(log, 47, 1) // An odd number of records to avoid offset alignment
+    }
+    assertFalse(log.maybeClean(), "Should not clean since HW was still 0")
+
+    log.updateHighWatermark(new LogOffsetMetadata(4000))
+    assertFalse(log.maybeClean(), "Should not clean since no snapshots exist")
+
+    val snapshotId1 = new OffsetAndEpoch(1000, 1)
+    TestUtils.resource(log.storeSnapshot(snapshotId1).get()) { snapshot =>
+      append(snapshot, 100)
+      snapshot.freeze()
+    }
+
+    val snapshotId2 = new OffsetAndEpoch(2000, 1)
+    TestUtils.resource(log.storeSnapshot(snapshotId2).get()) { snapshot =>
+      append(snapshot, 100)
+      snapshot.freeze()
+    }
+
+    val lsoBefore = log.startOffset()
+    assertTrue(log.maybeClean(), "Expected to clean since there was at least 
one snapshot")
+    val lsoAfter = log.startOffset()
+    assertTrue(lsoAfter > lsoBefore, "Log Start Offset should have increased 
after cleaning")
+    assertTrue(lsoAfter == snapshotId2.offset, "Expected the Log Start Offset 
to be less than or equal to the snapshot offset")
+  }
+
+  @Test
+  def testDeleteSnapshots(): Unit = {
+    // Generate some logs and a few snapshots, set retention low and verify 
that cleaning occurs
+    val config = DefaultMetadataLogConfig.copy(
+      logSegmentBytes = 1024,
+      logSegmentMillis = 10 * 1000,
+      retentionMaxBytes = 1024,
+      retentionMillis = 60 * 1000,
+      maxBatchSizeInBytes = 100
+    )
+    val log = buildMetadataLog(tempDir, mockTime, config)
+
+    for(_ <- 0 to 1000) {
+      append(log, 1, 1)
+    }
+    log.updateHighWatermark(new LogOffsetMetadata(1001))
+
+    for(offset <- Seq(100, 200, 300, 400, 500, 600)) {
+      val snapshotId = new OffsetAndEpoch(offset, 1)
+      TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot =>
+        append(snapshot, 10)
+        snapshot.freeze()
+      }
+    }
+
+    assertEquals(6, log.snapshotCount())
+    assertTrue(log.maybeClean())
+    assertEquals(1, log.snapshotCount(), "Expected only one snapshot after 
cleaning")
+    assertOptional(log.latestSnapshotId(), (snapshotId: OffsetAndEpoch) => {
+      assertEquals(600, snapshotId.offset)
+    })
+    assertEquals(log.startOffset, 600)
+  }
+
+  @Test
+  def testSoftRetentionLimit(): Unit = {
+    // Set retention equal to the segment size and generate slightly more than 
one segment of logs
+    val config = DefaultMetadataLogConfig.copy(
+      logSegmentBytes = 10240,
+      logSegmentMillis = 10 * 1000,
+      retentionMaxBytes = 10240,
+      retentionMillis = 60 * 1000,
+      maxBatchSizeInBytes = 100
+    )
+    val log = buildMetadataLog(tempDir, mockTime, config)
+
+    for(_ <- 0 to 2000) {
+      append(log, 1, 1)
+    }
+    log.updateHighWatermark(new LogOffsetMetadata(2000))
+
+    // Then generate two snapshots
+    val snapshotId1 = new OffsetAndEpoch(1000, 1)
+    TestUtils.resource(log.storeSnapshot(snapshotId1).get()) { snapshot =>
+      append(snapshot, 500)
+      snapshot.freeze()
+    }
+
+    // Then generate a snapshot
+    val snapshotId2 = new OffsetAndEpoch(2000, 1)
+    TestUtils.resource(log.storeSnapshot(snapshotId2).get()) { snapshot =>
+      append(snapshot, 500)
+      snapshot.freeze()
+    }
+
+    // Cleaning should occur, but resulting size will not be under retention 
limit since we have to keep one snapshot
+    assertTrue(log.maybeClean())
+    assertEquals(1, log.snapshotCount(), "Expected one snapshot after 
cleaning")
+    assertOptional(log.latestSnapshotId(), (snapshotId: OffsetAndEpoch) => {
+      assertEquals(2000, snapshotId.offset, "Unexpected offset for latest 
snapshot")
+      assertOptional(log.readSnapshot(snapshotId), (reader: RawSnapshotReader) 
=> {
+        assertTrue(reader.sizeInBytes() + log.log.size > 
config.retentionMaxBytes)
+      })
+    })
+  }
 }
 
 object KafkaMetadataLogTest {
@@ -817,12 +863,22 @@ object KafkaMetadataLogTest {
     }
   }
 
+  val DefaultMetadataLogConfig = MetadataLogConfig(
+    logSegmentBytes = 100 * 1024,
+    logSegmentMillis = 10 * 1000,
+    retentionMaxBytes = 100 * 1024,
+    retentionMillis = 60 * 1000,
+    maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+    maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+    fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+    nodeId = 1
+  )
+
   def buildMetadataLogAndDir(
     tempDir: File,
     time: MockTime,
-    maxBatchSizeInBytes: Int = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
-    maxFetchSizeInBytes: Int = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
-  ): (Path, KafkaMetadataLog) = {
+    metadataLogConfig: MetadataLogConfig = DefaultMetadataLogConfig
+  ): (Path, KafkaMetadataLog, MetadataLogConfig) = {
 
     val logDir = createLogDirectory(
       tempDir,
@@ -835,20 +891,18 @@ object KafkaMetadataLogTest {
       logDir,
       time,
       time.scheduler,
-      maxBatchSizeInBytes,
-      maxFetchSizeInBytes
+      metadataLogConfig
     )
 
-    (logDir.toPath, metadataLog)
+    (logDir.toPath, metadataLog, metadataLogConfig)
   }
 
   def buildMetadataLog(
     tempDir: File,
     time: MockTime,
-    maxBatchSizeInBytes: Int = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
-    maxFetchSizeInBytes: Int = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+    metadataLogConfig: MetadataLogConfig = DefaultMetadataLogConfig,
   ): KafkaMetadataLog = {
-    val (_, log) = buildMetadataLogAndDir(tempDir, time, maxBatchSizeInBytes, 
maxFetchSizeInBytes)
+    val (_, log, _) = buildMetadataLogAndDir(tempDir, time, metadataLogConfig)
     log
   }
 
@@ -864,6 +918,15 @@ object KafkaMetadataLogTest {
     )
   }
 
+  def append(snapshotWriter: RawSnapshotWriter, numberOfRecords: Int): Unit = {
+    snapshotWriter.append(MemoryRecords.withRecords(
+      0,
+      CompressionType.NONE,
+      0,
+      (0 until numberOfRecords).map(number => new 
SimpleRecord(number.toString.getBytes)): _*
+    ))
+  }
+
   private def createLogDirectory(logDir: File, logDirName: String): File = {
     val logDirPath = logDir.getAbsolutePath
     val dir = new File(logDirPath, logDirName)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 7241bf9..b555b01 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -603,6 +603,10 @@ class KafkaConfigTest {
         case KafkaConfig.BrokerSessionTimeoutMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case KafkaConfig.NodeIdProp => assertPropertyInvalid(baseProperties, 
name, "not_a_number")
         case KafkaConfig.MetadataLogDirProp => // ignore string
+        case KafkaConfig.MetadataLogSegmentBytesProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case KafkaConfig.MetadataLogSegmentMillisProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case KafkaConfig.MetadataMaxRetentionBytesProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case KafkaConfig.MetadataMaxRetentionMillisProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case KafkaConfig.ControllerListenerNamesProp => // ignore string
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 9e53d2c..10a4488 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -160,6 +160,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     private final KafkaRaftMetrics kafkaRaftMetrics;
     private final QuorumState quorum;
     private final RequestManager requestManager;
+    private final RaftMetadataLogCleanerManager snapshotCleaner;
 
     private final List<ListenerContext> listenerContexts = new ArrayList<>();
     private final ConcurrentLinkedQueue<Listener<T>> pendingListeners = new 
ConcurrentLinkedQueue<>();
@@ -230,7 +231,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         this.logger = logContext.logger(KafkaRaftClient.class);
         this.random = random;
         this.raftConfig = raftConfig;
-
+        this.snapshotCleaner = new RaftMetadataLogCleanerManager(logger, time, 
60000, log::maybeClean);
         Set<Integer> quorumVoterIds = raftConfig.quorumVoterIds();
         this.requestManager = new RequestManager(quorumVoterIds, 
raftConfig.retryBackoffMs(),
             raftConfig.requestTimeoutMs(), random);
@@ -2091,8 +2092,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     }
 
     private long pollCurrentState(long currentTimeMs) {
-        maybeDeleteBeforeSnapshot();
-
         if (quorum.isLeader()) {
             return pollLeader(currentTimeMs);
         } else if (quorum.isCandidate()) {
@@ -2155,14 +2154,34 @@ public class KafkaRaftClient<T> implements 
RaftClient<T> {
         return false;
     }
 
-    private void maybeDeleteBeforeSnapshot() {
-        log.latestSnapshotId().ifPresent(snapshotId -> {
-            quorum.highWatermark().ifPresent(highWatermark -> {
-                if (highWatermark.offset >= snapshotId.offset) {
-                    log.deleteBeforeSnapshot(snapshotId);
+    /**
+     * A simple timer based log cleaner
+     */
+    private static class RaftMetadataLogCleanerManager {
+        private final Logger logger;
+        private final Timer timer;
+        private final long delayMs;
+        private final Runnable cleaner;
+
+        RaftMetadataLogCleanerManager(Logger logger, Time time, long delayMs, 
Runnable cleaner) {
+            this.logger = logger;
+            this.timer = time.timer(delayMs);
+            this.delayMs = delayMs;
+            this.cleaner = cleaner;
+        }
+
+        public long maybeClean(long currentTimeMs) {
+            timer.update(currentTimeMs);
+            if (timer.isExpired()) {
+                try {
+                    cleaner.run();
+                } catch (Throwable t) {
+                    logger.error("Had an error during log cleaning", t);
                 }
-            });
-        });
+                timer.reset(delayMs);
+            }
+            return timer.remainingMs();
+        }
     }
 
     private void wakeup() {
@@ -2191,7 +2210,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> 
{
             return;
         }
 
-        long pollTimeoutMs = pollCurrentState(currentTimeMs);
+        long pollStateTimeoutMs = pollCurrentState(currentTimeMs);
+        long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs);
+        long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);
+
         kafkaRaftMetrics.updatePollStart(currentTimeMs);
 
         RaftMessage message = messageQueue.poll(pollTimeoutMs);
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java 
b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
index 01c6aa4..1b49b3a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
@@ -175,12 +175,12 @@ public interface ReplicatedLog extends AutoCloseable {
     void updateHighWatermark(LogOffsetMetadata offsetMetadata);
 
     /**
-     * Updates the log start offset and delete segments if necessary.
+     * Delete all snapshots prior to the given snapshot
      *
      * The replicated log's start offset can be increased and older segments 
can be deleted when
      * there is a snapshot greater than the current log start offset.
      */
-    boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId);
+    boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId);
 
     /**
      * Flush the current log to disk.
@@ -188,6 +188,11 @@ public interface ReplicatedLog extends AutoCloseable {
     void flush();
 
     /**
+     * Possibly perform cleaning of snapshots and logs
+     */
+    boolean maybeClean();
+
+    /**
      * Get the last offset which has been flushed to disk.
      */
     long lastFlushedOffset();
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java 
b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
index ce0c7dd..a4d3b5a 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
@@ -116,7 +116,13 @@ public final class Snapshots {
         Path immutablePath = snapshotPath(logDir, snapshotId);
         Path deletedPath = deleteRename(immutablePath, snapshotId);
         try {
-            return Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletedPath);
+            boolean deleted = Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletedPath);
+            if (deleted) {
+                log.info("Deleted snapshot files for snapshot {}.", 
snapshotId);
+            } else {
+                log.info("Did not delete snapshot files for snapshot {} since 
they did not exist.", snapshotId);
+            }
+            return deleted;
         } catch (IOException e) {
             log.error("Error deleting snapshot files {} and {}", 
immutablePath, deletedPath, e);
             return false;
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index df9da9d..04b5676 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -49,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 final public class KafkaRaftClientSnapshotTest {
     @Test
-    public void testLeaderListernerNotified() throws Exception {
+    public void testLeaderListenerNotified() throws Exception {
         int localId = 0;
         int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -128,6 +128,7 @@ final public class KafkaRaftClientSnapshotTest {
             .appendToLog(snapshotId.epoch, Arrays.asList("a", "b", "c"))
             .appendToLog(snapshotId.epoch, Arrays.asList("d", "e", "f"))
             .withEmptySnapshot(snapshotId)
+            .deleteBeforeSnapshot(snapshotId)
             .withElectedLeader(epoch, leaderId)
             .build();
 
@@ -196,6 +197,7 @@ final public class KafkaRaftClientSnapshotTest {
             assertEquals(secondSnapshotId, snapshot.snapshotId());
             snapshot.freeze();
         }
+        context.log.deleteBeforeSnapshot(secondSnapshotId);
         context.client.poll();
 
         // Resume the listener from reading commit batches
@@ -244,11 +246,9 @@ final public class KafkaRaftClientSnapshotTest {
             assertEquals(snapshotId, snapshot.snapshotId());
             snapshot.freeze();
         }
-
+        context.log.deleteBeforeSnapshot(snapshotId);
         context.client.poll();
 
-        assertEquals(snapshotId.offset, context.log.startOffset());
-
         // Send Fetch request less than start offset
         context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, 
epoch, 0));
         context.pollUntilResponse();
@@ -425,6 +425,7 @@ final public class KafkaRaftClientSnapshotTest {
             assertEquals(oldestSnapshotId, snapshot.snapshotId());
             snapshot.freeze();
         }
+        context.log.deleteBeforeSnapshot(oldestSnapshotId);
         context.client.poll();
 
         // Send fetch with log start offset and invalid last fetched epoch
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 3563016..fb2168b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -317,6 +317,11 @@ public class MockLog implements ReplicatedLog {
     }
 
     @Override
+    public boolean maybeClean() {
+        return false;
+    }
+
+    @Override
     public long lastFlushedOffset() {
         return lastFlushedOffset;
     }

Reply via email to