This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 258ed5a KAFKA-12155: Metadata log and snapshot cleaning #10864
258ed5a is described below
commit 258ed5a744d7a1d6a60c21844d0383dc7ca9ac8c
Author: David Arthur <[email protected]>
AuthorDate: Mon Jun 7 16:25:28 2021 -0400
KAFKA-12155: Metadata log and snapshot cleaning #10864
This PR includes changes to KafkaRaftClient and KafkaMetadataLog to support
periodic
cleaning of old log segments and snapshots.
Four new public config keys are introduced: metadata.log.segment.bytes,
metadata.log.segment.ms, metadata.max.retention.bytes, and
metadata.max.retention.ms.
These are used to configure the log layer as well as the snapshot cleaning
logic. Snapshot
and log cleaning is performed based on two criteria: total metadata log +
snapshot size
(metadata.max.retention.bytes), and max age of a snapshot
(metadata.max.retention.ms).
Since we have a requirement that the log start offset must always align
with a snapshot,
we perform the cleaning on snapshots first and then clean what logs we can.
The cleaning algorithm follows:
1. Delete the oldest snapshot.
2. Advance the log start offset to the new oldest snapshot.
3. Request that the log layer clean any segments prior to the new log start
offset
4. Repeat this until the retention size or time is no longer violated, or
only a single
snapshot remains.
The cleaning process is triggered every 60 seconds from the KafkaRaftClient
polling
thread.
Reviewers: José Armando García Sancio <[email protected]>, dengziming
<[email protected]>, Colin P. McCabe <[email protected]>
---
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 227 ++++++++++++---
core/src/main/scala/kafka/raft/RaftManager.scala | 3 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 25 +-
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 321 ++++++++++++---------
.../scala/unit/kafka/server/KafkaConfigTest.scala | 4 +
.../org/apache/kafka/raft/KafkaRaftClient.java | 44 ++-
.../java/org/apache/kafka/raft/ReplicatedLog.java | 9 +-
.../java/org/apache/kafka/snapshot/Snapshots.java | 8 +-
.../kafka/raft/KafkaRaftClientSnapshotTest.java | 9 +-
.../test/java/org/apache/kafka/raft/MockLog.java | 5 +
10 files changed, 470 insertions(+), 185 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 2a9e7a1..df8e1af 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -16,34 +16,37 @@
*/
package kafka.raft
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
import kafka.api.ApiVersion
-import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot,
SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd,
LogDirFailureChannel, RequestLocal}
+import kafka.log.{AppendOrigin, Defaults, Log, LogConfig, LogOffsetSnapshot,
SnapshotGenerated}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd,
KafkaConfig, LogDirFailureChannel, RequestLocal}
import kafka.utils.{CoreUtils, Logging, Scheduler}
-import org.apache.kafka.common.record.{MemoryRecords, Records}
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.config.AbstractConfig
+import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords,
Records}
+import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo,
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog,
ValidOffsetAndEpoch}
import org.apache.kafka.snapshot.{FileRawSnapshotReader,
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath,
Snapshots}
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
import scala.annotation.nowarn
import scala.collection.mutable
import scala.compat.java8.OptionConverters._
final class KafkaMetadataLog private (
- log: Log,
+ val log: Log,
+ time: Time,
scheduler: Scheduler,
// Access to this object needs to be synchronized because it is used by the
snapshotting thread to notify the
// polling thread when snapshots are created. This object is also used to
store any opened snapshot reader.
snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
topicPartition: TopicPartition,
- maxFetchSizeInBytes: Int,
- val fileDeleteDelayMs: Long // Visible for testing,
+ config: MetadataLogConfig
) extends ReplicatedLog with Logging {
+ this.logIdent = s"[MetadataLog partition=$topicPartition,
nodeId=${config.nodeId}] "
+
override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo
= {
val isolation = readIsolation match {
case Isolation.COMMITTED => FetchHighWatermark
@@ -52,7 +55,7 @@ final class KafkaMetadataLog private (
}
val fetchInfo = log.read(startOffset,
- maxLength = maxFetchSizeInBytes,
+ maxLength = config.maxFetchSizeInBytes,
isolation = isolation,
minOneMessage = true)
@@ -91,10 +94,6 @@ final class KafkaMetadataLog private (
private def handleAndConvertLogAppendInfo(appendInfo:
kafka.log.LogAppendInfo): LogAppendInfo = {
appendInfo.firstOffset match {
case Some(firstOffset) =>
- if (firstOffset.relativePositionInSegment == 0) {
- // Assume that a new segment was created if the relative position is 0
- log.deleteOldSegments()
- }
new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
case None =>
throw new KafkaException(s"Append failed unexpectedly:
${appendInfo.errorMessage}")
@@ -312,29 +311,155 @@ final class KafkaMetadataLog private (
}
}
- override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch):
Boolean = {
+ /**
+ * Delete snapshots that come before a given snapshot ID. This is done by
advancing the log start offset to the given
+ * snapshot and cleaning old log segments.
+ *
+ * This will only happen if the following invariants all hold true:
+ *
+ * <li>The given snapshot precedes the latest snapshot</li>
+ * <li>The offset of the given snapshot is greater than the log start
offset</li>
+ * <li>The log layer can advance the offset to the given snapshot</li>
+ *
+ * This method is thread-safe
+ */
+ override def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch): Boolean = {
val (deleted, forgottenSnapshots) = snapshots synchronized {
latestSnapshotId().asScala match {
- case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
- startOffset < logStartSnapshotId.offset &&
- logStartSnapshotId.offset <= snapshotId.offset &&
- log.maybeIncrementLogStartOffset(logStartSnapshotId.offset,
SnapshotGenerated)) =>
-
- // Delete all segments that have a "last offset" less than the log
start offset
- log.deleteOldSegments()
-
- // Forget snapshots less than the log start offset
- (true, forgetSnapshotsBefore(logStartSnapshotId))
+ case Some(latestSnapshotId) if
+ snapshots.contains(snapshotId) &&
+ startOffset < snapshotId.offset &&
+ snapshotId.offset <= latestSnapshotId.offset &&
+ log.maybeIncrementLogStartOffset(snapshotId.offset,
SnapshotGenerated) =>
+ // Delete all segments that have a "last offset" less than the log
start offset
+ log.deleteOldSegments()
+ // Remove older snapshots from the snapshots cache
+ (true, forgetSnapshotsBefore(snapshotId))
case _ =>
- (false, mutable.TreeMap.empty[OffsetAndEpoch,
Option[FileRawSnapshotReader]])
+ (false, mutable.TreeMap.empty[OffsetAndEpoch,
Option[FileRawSnapshotReader]])
}
}
-
removeSnapshots(forgottenSnapshots)
deleted
}
/**
+ * Force all known snapshots to have an open reader so we can know their
sizes. This method is not thread-safe
+ */
+ private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+ snapshots.keys.toSeq.flatMap {
+ snapshotId => readSnapshot(snapshotId).asScala.map { reader =>
(snapshotId, reader.sizeInBytes())}
+ }
+ }
+
+ /**
+ * Return the max timestamp of the first batch in a snapshot, if the
snapshot exists and has records
+ */
+ private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long]
= {
+ readSnapshot(snapshotId).asScala.flatMap { reader =>
+ val batchIterator = reader.records().batchIterator()
+
+ val firstBatch = batchIterator.next()
+ val records = firstBatch.streamingIterator(new
BufferSupplier.GrowableBufferSupplier())
+ if (firstBatch.isControlBatch) {
+ val header =
ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next());
+ Some(header.lastContainedLogTimestamp())
+ } else {
+ warn("Did not find control record at beginning of snapshot")
+ None
+ }
+ }
+ }
+
+ /**
+ * Perform cleaning of old snapshots and log segments based on size.
+ *
+ * If our configured retention size has been violated, we perform cleaning
as follows:
+ *
+ * <li>Find oldest snapshot and delete it</li>
+ * <li>Advance log start offset to end of next oldest snapshot</li>
+ * <li>Delete log segments which wholly precede the new log start offset</li>
+ *
+ * This process is repeated until the retention size is no longer violated,
or until only
+ * a single snapshot remains.
+ */
+ override def maybeClean(): Boolean = {
+ snapshots synchronized {
+ var didClean = false
+ didClean |= cleanSnapshotsRetentionSize()
+ didClean |= cleanSnapshotsRetentionMs()
+ didClean
+ }
+ }
+
+ /**
+ * Iterate through the snapshots a test the given predicate to see if we
should attempt to delete it. Since
+ * we have some additional invariants regarding snapshots and log segments
we cannot simply delete a snapshot in
+ * all cases.
+ *
+ * For the given predicate, we are testing if the snapshot identified by the
first argument should be deleted.
+ */
+ private def cleanSnapshots(predicate: (OffsetAndEpoch) => Boolean): Boolean
= {
+ if (snapshots.size < 2)
+ return false
+
+ var didClean = false
+ snapshots.keys.toSeq.sliding(2).toSeq.takeWhile {
+ case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
+ if (predicate(snapshot) && deleteBeforeSnapshot(nextSnapshot)) {
+ didClean = true
+ true
+ } else {
+ false
+ }
+ case _ => false // Shouldn't get here with the sliding window
+ }
+ didClean
+ }
+
+ private def cleanSnapshotsRetentionMs(): Boolean = {
+ if (config.retentionMillis < 0)
+ return false
+
+ // Keep deleting snapshots as long as the
+ def shouldClean(snapshotId: OffsetAndEpoch): Boolean = {
+ val now = time.milliseconds()
+ readSnapshotTimestamp(snapshotId).exists { timestamp =>
+ if (now - timestamp > config.retentionMillis) {
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ cleanSnapshots(shouldClean)
+ }
+
+ private def cleanSnapshotsRetentionSize(): Boolean = {
+ if (config.retentionMaxBytes < 0)
+ return false
+
+ val snapshotSizes = loadSnapshotSizes().toMap
+
+ var snapshotTotalSize: Long = snapshotSizes.values.sum
+
+ // Keep deleting snapshots and segments as long as we exceed the retention
size
+ def shouldClean(snapshotId: OffsetAndEpoch): Boolean = {
+ snapshotSizes.get(snapshotId).exists { snapshotSize =>
+ if (log.size + snapshotTotalSize > config.retentionMaxBytes) {
+ snapshotTotalSize -= snapshotSize
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ cleanSnapshots(shouldClean)
+ }
+
+ /**
* Forget the snapshots earlier than a given snapshot id and return the
associated
* snapshot readers.
*
@@ -358,6 +483,7 @@ final class KafkaMetadataLog private (
expiredSnapshots: mutable.TreeMap[OffsetAndEpoch,
Option[FileRawSnapshotReader]]
): Unit = {
expiredSnapshots.foreach { case (snapshotId, _) =>
+ info(s"Marking snapshot $snapshotId for deletion")
Snapshots.markForDelete(log.dir.toPath, snapshotId)
}
@@ -365,7 +491,7 @@ final class KafkaMetadataLog private (
scheduler.schedule(
"delete-snapshot-files",
KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots,
this),
- fileDeleteDelayMs
+ config.fileDeleteDelayMs
)
}
}
@@ -377,22 +503,53 @@ final class KafkaMetadataLog private (
snapshots.clear()
}
}
+
+ private[raft] def snapshotCount(): Int = {
+ snapshots synchronized {
+ snapshots.size
+ }
+ }
}
-object KafkaMetadataLog {
+object MetadataLogConfig {
+ def apply(config: AbstractConfig, maxBatchSizeInBytes: Int,
maxFetchSizeInBytes: Int): MetadataLogConfig = {
+ new MetadataLogConfig(
+ config.getInt(KafkaConfig.MetadataLogSegmentBytesProp),
+ config.getLong(KafkaConfig.MetadataLogSegmentMillisProp),
+ config.getLong(KafkaConfig.MetadataMaxRetentionBytesProp),
+ config.getLong(KafkaConfig.MetadataMaxRetentionMillisProp),
+ maxBatchSizeInBytes,
+ maxFetchSizeInBytes,
+ Defaults.FileDeleteDelayMs,
+ config.getInt(KafkaConfig.NodeIdProp)
+ )
+ }
+}
+
+case class MetadataLogConfig(logSegmentBytes: Int,
+ logSegmentMillis: Long,
+ retentionMaxBytes: Long,
+ retentionMillis: Long,
+ maxBatchSizeInBytes: Int,
+ maxFetchSizeInBytes: Int,
+ fileDeleteDelayMs: Int,
+ nodeId: Int)
+object KafkaMetadataLog {
def apply(
topicPartition: TopicPartition,
topicId: Uuid,
dataDir: File,
time: Time,
scheduler: Scheduler,
- maxBatchSizeInBytes: Int,
- maxFetchSizeInBytes: Int
+ config: MetadataLogConfig
): KafkaMetadataLog = {
val props = new Properties()
- props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+ props.put(LogConfig.MaxMessageBytesProp,
config.maxBatchSizeInBytes.toString)
props.put(LogConfig.MessageFormatVersionProp,
ApiVersion.latestVersion.toString)
+ props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
+ props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
+ props.put(LogConfig.FileDeleteDelayMsProp,
Int.box(Defaults.FileDeleteDelayMs))
LogConfig.validateValues(props)
val defaultLogConfig = LogConfig(props)
@@ -415,11 +572,11 @@ object KafkaMetadataLog {
val metadataLog = new KafkaMetadataLog(
log,
+ time,
scheduler,
recoverSnapshots(log),
topicPartition,
- maxFetchSizeInBytes,
- defaultLogConfig.fileDeleteDelayMs
+ config
)
// When recovering, truncate fully if the latest snapshot is after the log
end offset. This can happen to a follower
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala
b/core/src/main/scala/kafka/raft/RaftManager.scala
index a1dcf64..1ab63b8 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -254,8 +254,7 @@ class KafkaRaftManager[T](
dataDir,
time,
scheduler,
- maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
- maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ config = MetadataLogConfig(config, KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
)
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fa8ddf0..6a3c97b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -378,6 +378,10 @@ object KafkaConfig {
val MetadataSnapshotMaxNewRecordBytesProp =
"metadata.log.max.record.bytes.between.snapshots"
val ControllerListenerNamesProp = "controller.listener.names"
val SaslMechanismControllerProtocolProp =
"sasl.mechanism.controller.protocol"
+ val MetadataLogSegmentBytesProp = "metadata.log.segment.bytes"
+ val MetadataLogSegmentMillisProp = "metadata.log.segment.ms"
+ val MetadataMaxRetentionBytesProp = "metadata.max.retention.bytes"
+ val MetadataMaxRetentionMillisProp = "metadata.max.retention.ms"
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
@@ -670,6 +674,12 @@ object KafkaConfig {
val ControllerListenerNamesDoc = "A comma-separated list of the names of the
listeners used by the controller. This is required " +
"if running in KRaft mode. The ZK-based controller will not use this
configuration."
val SaslMechanismControllerProtocolDoc = "SASL mechanism used for
communication with controllers. Default is GSSAPI."
+ val MetadataLogSegmentBytesDoc = "The maximum size of a single metadata log
file."
+ val MetadataLogSegmentMillisDoc = "The maximum time before a new metadata
log file is rolled out (in milliseconds)."
+ val MetadataMaxRetentionBytesDoc = "The maximum combined size of the
metadata log and snapshots before deleting old " +
+ "snapshots and log files. Since at least one snapshot must exist before
any logs can be deleted, this is a soft limit."
+ val MetadataMaxRetentionMillisDoc = "The number of milliseconds to keep a
metadata log file or snapshot before " +
+ "deleting it. Since at least one snapshot must exist before any logs can
be deleted, this is a soft limit."
/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = s"The fully qualified name of a class that
implements s${classOf[Authorizer].getName}" +
@@ -1048,14 +1058,21 @@ object KafkaConfig {
*/
.define(MetadataSnapshotMaxNewRecordBytesProp, LONG,
Defaults.MetadataSnapshotMaxNewRecordBytes, atLeast(1), HIGH,
MetadataSnapshotMaxNewRecordBytesDoc)
+ /*
+ * KRaft mode private configs. Note that these configs are defined as
internal. We will make them public in the 3.0.0 release.
+ */
.define(ProcessRolesProp, LIST, Collections.emptyList(),
ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
.define(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, NodeIdDoc)
.define(InitialBrokerRegistrationTimeoutMsProp, INT,
Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM,
InitialBrokerRegistrationTimeoutMsDoc)
.define(BrokerHeartbeatIntervalMsProp, INT,
Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
.define(BrokerSessionTimeoutMsProp, INT,
Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
- .define(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
.define(ControllerListenerNamesProp, STRING, null, null, HIGH,
ControllerListenerNamesDoc)
.define(SaslMechanismControllerProtocolProp, STRING,
SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH,
SaslMechanismControllerProtocolDoc)
+ .define(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
+ .define(MetadataLogSegmentBytesProp, INT, Defaults.LogSegmentBytes,
atLeast(Records.LOG_OVERHEAD), HIGH, MetadataLogSegmentBytesDoc)
+ .define(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 *
60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
+ .define(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes,
null, HIGH, MetadataMaxRetentionBytesDoc)
+ .define(MetadataMaxRetentionMillisProp, LONG, Defaults.LogRetentionHours
* 60 * 60 * 1000L, null, HIGH, MetadataMaxRetentionMillisDoc)
/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName,
LOW, AuthorizerClassNameDoc)
@@ -1523,6 +1540,12 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
}
}
+ def metadataLogSegmentBytes = getInt(KafkaConfig.MetadataLogSegmentBytesProp)
+ def metadataLogSegmentMillis =
getLong(KafkaConfig.MetadataLogSegmentMillisProp)
+ def metadataRetentionBytes =
getLong(KafkaConfig.MetadataMaxRetentionBytesProp)
+ def metadataRetentionMillis =
getLong(KafkaConfig.MetadataMaxRetentionMillisProp)
+
+
def numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
def backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 1d0c8d8..fb477f3 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -16,25 +16,27 @@
*/
package kafka.raft
-import java.io.File
-import java.nio.ByteBuffer
-import java.nio.file.{Files, Path}
-import java.util.{Collections, Optional}
-import kafka.log.Log
+import kafka.log.{Defaults, Log, SegmentDeletion}
import kafka.server.KafkaRaftServer
import kafka.utils.{MockTime, TestUtils}
-import org.apache.kafka.common.errors.{OffsetOutOfRangeException,
RecordTooLargeException}
+import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.protocol
import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.internals.BatchBuilder
-import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo,
LogOffsetMetadata, OffsetAndEpoch, ReplicatedLog, ValidOffsetAndEpoch}
+import org.apache.kafka.raft._
import org.apache.kafka.server.common.serialization.RecordSerde
-import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertThrows, assertTrue}
+import org.apache.kafka.snapshot.{RawSnapshotReader, RawSnapshotWriter,
SnapshotPath, Snapshots}
+import org.apache.kafka.test.TestUtils.assertOptional
+import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import java.io.File
+import java.nio.ByteBuffer
+import java.nio.file.{Files, Path}
+import java.util.{Collections, Optional}
+
final class KafkaMetadataLogTest {
import KafkaMetadataLogTest._
@@ -158,27 +160,42 @@ final class KafkaMetadataLogTest {
def testCreateSnapshotBeforeLogStartOffset(): Unit = {
val numberOfRecords = 10
val epoch = 1
- val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+ val snapshotId = new OffsetAndEpoch(numberOfRecords-4, epoch)
val log = buildMetadataLog(tempDir, mockTime)
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
- assertTrue(log.deleteBeforeSnapshot(snapshotId))
- assertEquals(snapshotId.offset, log.startOffset)
+ // Simulate log cleanup that advances the LSO
+ log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1,
SegmentDeletion)
assertThrows(
classOf[IllegalArgumentException],
- () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 1,
snapshotId.epoch))
+ () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2,
snapshotId.epoch))
)
}
@Test
- def testCreateSnapshotMuchEalierEpoch(): Unit = {
+ def testCreateSnapshotDivergingEpoch(): Unit = {
+ val numberOfRecords = 10
+ val epoch = 2
+ val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+ val log = buildMetadataLog(tempDir, mockTime)
+
+ append(log, numberOfRecords, epoch)
+ log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+ assertThrows(
+ classOf[IllegalArgumentException],
+ () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset,
snapshotId.epoch - 1))
+ )
+ }
+
+ @Test
+ def testCreateSnapshotOlderEpoch(): Unit = {
val numberOfRecords = 10
val epoch = 2
val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
@@ -191,9 +208,6 @@ final class KafkaMetadataLogTest {
snapshot.freeze()
}
- assertTrue(log.deleteBeforeSnapshot(snapshotId))
- assertEquals(snapshotId.offset, log.startOffset)
-
assertThrows(
classOf[IllegalArgumentException],
() => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset,
snapshotId.epoch - 1))
@@ -243,9 +257,8 @@ final class KafkaMetadataLogTest {
snapshot.freeze()
}
- assertTrue(log.deleteBeforeSnapshot(snapshotId))
- assertEquals(snapshotId.offset, log.startOffset)
- assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId))
+ assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId),
+ "Creating an existing snapshot should not do anything")
}
@Test
@@ -263,71 +276,7 @@ final class KafkaMetadataLogTest {
}
@Test
- def testUpdateLogStartOffset(): Unit = {
- val log = buildMetadataLog(tempDir, mockTime)
- val offset = 10
- val epoch = 0
- val snapshotId = new OffsetAndEpoch(offset, epoch)
-
- append(log, offset, epoch)
- log.updateHighWatermark(new LogOffsetMetadata(offset))
-
- TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
-
- assertTrue(log.deleteBeforeSnapshot(snapshotId))
- assertEquals(offset, log.startOffset)
- assertEquals(epoch, log.lastFetchedEpoch)
- assertEquals(offset, log.endOffset().offset)
- assertEquals(offset, log.highWatermark.offset)
-
- val newRecords = 10
- append(log, newRecords, epoch + 1)
- // Start offset should not change since a new snapshot was not generated
- assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(offset +
newRecords, epoch)))
- assertEquals(offset, log.startOffset)
-
- assertEquals(epoch + 1, log.lastFetchedEpoch)
- assertEquals(offset + newRecords, log.endOffset().offset)
- assertEquals(offset, log.highWatermark.offset)
- }
-
- @Test
- def testUpdateLogStartOffsetWillRemoveOlderSnapshot(): Unit = {
- val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
- val offset = 10
- val epoch = 0
-
- append(log, offset, epoch)
- val oldSnapshotId = new OffsetAndEpoch(offset, epoch)
- TestUtils.resource(log.storeSnapshot(oldSnapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
-
- append(log, offset, epoch)
- val newSnapshotId = new OffsetAndEpoch(offset * 2, epoch)
- TestUtils.resource(log.storeSnapshot(newSnapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
-
- log.updateHighWatermark(new LogOffsetMetadata(offset * 2))
- assertTrue(log.deleteBeforeSnapshot(newSnapshotId))
- log.close()
-
- mockTime.sleep(log.fileDeleteDelayMs)
- // Assert that the log dir doesn't contain any older snapshots
- Files
- .walk(logDir, 1)
- .map[Optional[SnapshotPath]](Snapshots.parse)
- .filter(_.isPresent)
- .forEach { path =>
- assertFalse(path.get.snapshotId.offset < log.startOffset)
- }
- }
-
- @Test
- def testUpdateLogStartOffsetWithMissingSnapshot(): Unit = {
+ def testDeleteNonExistentSnapshot(): Unit = {
val log = buildMetadataLog(tempDir, mockTime)
val offset = 10
val epoch = 0
@@ -335,7 +284,7 @@ final class KafkaMetadataLogTest {
append(log, offset, epoch)
log.updateHighWatermark(new LogOffsetMetadata(offset))
- assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(1L, epoch)))
+ assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(2L, epoch)))
assertEquals(0, log.startOffset)
assertEquals(epoch, log.lastFetchedEpoch)
assertEquals(offset, log.endOffset().offset)
@@ -343,26 +292,6 @@ final class KafkaMetadataLogTest {
}
@Test
- def testFailToIncreaseLogStartPastHighWatermark(): Unit = {
- val log = buildMetadataLog(tempDir, mockTime)
- val offset = 10
- val epoch = 0
- val snapshotId = new OffsetAndEpoch(2 * offset, 1 + epoch)
-
- append(log, offset, epoch)
- log.updateHighWatermark(new LogOffsetMetadata(offset))
-
- TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
-
- assertThrows(
- classOf[OffsetOutOfRangeException],
- () => log.deleteBeforeSnapshot(snapshotId)
- )
- }
-
- @Test
def testTruncateFullyToLatestSnapshot(): Unit = {
val log = buildMetadataLog(tempDir, mockTime)
val numberOfRecords = 10
@@ -398,8 +327,7 @@ final class KafkaMetadataLogTest {
@Test
def testTruncateWillRemoveOlderSnapshot(): Unit = {
-
- val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
+ val (logDir, log, config) = buildMetadataLogAndDir(tempDir, mockTime)
val numberOfRecords = 10
val epoch = 1
@@ -432,7 +360,7 @@ final class KafkaMetadataLogTest {
assertEquals(log.earliestSnapshotId(), log.latestSnapshotId())
log.close()
- mockTime.sleep(log.fileDeleteDelayMs)
+ mockTime.sleep(config.fileDeleteDelayMs)
// Assert that the log dir doesn't contain any older snapshots
Files
.walk(logDir, 1)
@@ -470,7 +398,7 @@ final class KafkaMetadataLogTest {
@Test
def testCleanupPartialSnapshots(): Unit = {
- val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
+ val (logDir, log, config) = buildMetadataLogAndDir(tempDir, mockTime)
val numberOfRecords = 10
val epoch = 1
val snapshotId = new OffsetAndEpoch(1, epoch)
@@ -507,7 +435,7 @@ final class KafkaMetadataLogTest {
@Test
def testCleanupOlderSnapshots(): Unit = {
- val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
+ val (logDir, log, config) = buildMetadataLogAndDir(tempDir, mockTime)
val numberOfRecords = 10
val epoch = 1
@@ -542,7 +470,7 @@ final class KafkaMetadataLogTest {
assertEquals(greaterSnapshotId, secondLog.latestSnapshotId().get)
assertEquals(3 * numberOfRecords, secondLog.startOffset)
assertEquals(epoch, secondLog.lastFetchedEpoch)
- mockTime.sleep(log.fileDeleteDelayMs)
+ mockTime.sleep(config.fileDeleteDelayMs)
// Assert that the log dir doesn't contain any older snapshots
Files
@@ -582,7 +510,7 @@ final class KafkaMetadataLogTest {
val leaderEpoch = 5
val maxBatchSizeInBytes = 16384
val recordSize = 64
- val log = buildMetadataLog(tempDir, mockTime, maxBatchSizeInBytes)
+ val log = buildMetadataLog(tempDir, mockTime,
DefaultMetadataLogConfig.copy(maxBatchSizeInBytes = maxBatchSizeInBytes))
val oversizeBatch = buildFullBatch(leaderEpoch, recordSize,
maxBatchSizeInBytes + recordSize)
assertThrows(classOf[RecordTooLargeException], () => {
@@ -661,10 +589,9 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
- TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot =>
+ TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
- assertTrue(log.deleteBeforeSnapshot(snapshotId))
val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords,
epoch - 1)
assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
@@ -682,10 +609,11 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, epoch)
- TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot =>
+ TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
- assertTrue(log.deleteBeforeSnapshot(snapshotId))
+ // Simulate log cleaning advancing the LSO
+ log.log.maybeIncrementLogStartOffset(offset, SegmentDeletion);
val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
@@ -703,10 +631,9 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, epoch)
- TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot =>
+ TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
snapshot.freeze()
}
- assertTrue(log.deleteBeforeSnapshot(snapshotId))
val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
@@ -800,6 +727,125 @@ final class KafkaMetadataLogTest {
assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
assertEquals(new OffsetAndEpoch(numberOfRecords - 1, epoch),
resultOffsetAndEpoch.offsetAndEpoch())
}
+
+ @Test
+ def testAdvanceLogStartOffsetAfterCleaning(): Unit = {
+ val config = MetadataLogConfig(
+ logSegmentBytes = 512,
+ logSegmentMillis = 10 * 1000,
+ retentionMaxBytes = 256,
+ retentionMillis = 60 * 1000,
+ maxBatchSizeInBytes = 512,
+ maxFetchSizeInBytes = DefaultMetadataLogConfig.maxFetchSizeInBytes,
+ fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+ nodeId = 1
+ )
+ config.copy()
+ val log = buildMetadataLog(tempDir, mockTime, config)
+
+ // Generate some segments
+ for(_ <- 0 to 100) {
+ append(log, 47, 1) // An odd number of records to avoid offset alignment
+ }
+ assertFalse(log.maybeClean(), "Should not clean since HW was still 0")
+
+ log.updateHighWatermark(new LogOffsetMetadata(4000))
+ assertFalse(log.maybeClean(), "Should not clean since no snapshots exist")
+
+ val snapshotId1 = new OffsetAndEpoch(1000, 1)
+ TestUtils.resource(log.storeSnapshot(snapshotId1).get()) { snapshot =>
+ append(snapshot, 100)
+ snapshot.freeze()
+ }
+
+ val snapshotId2 = new OffsetAndEpoch(2000, 1)
+ TestUtils.resource(log.storeSnapshot(snapshotId2).get()) { snapshot =>
+ append(snapshot, 100)
+ snapshot.freeze()
+ }
+
+ val lsoBefore = log.startOffset()
+ assertTrue(log.maybeClean(), "Expected to clean since there was at least
one snapshot")
+ val lsoAfter = log.startOffset()
+ assertTrue(lsoAfter > lsoBefore, "Log Start Offset should have increased
after cleaning")
+ assertTrue(lsoAfter == snapshotId2.offset, "Expected the Log Start Offset
to be less than or equal to the snapshot offset")
+ }
+
+ @Test
+ def testDeleteSnapshots(): Unit = {
+ // Generate some logs and a few snapshots, set retention low and verify
that cleaning occurs
+ val config = DefaultMetadataLogConfig.copy(
+ logSegmentBytes = 1024,
+ logSegmentMillis = 10 * 1000,
+ retentionMaxBytes = 1024,
+ retentionMillis = 60 * 1000,
+ maxBatchSizeInBytes = 100
+ )
+ val log = buildMetadataLog(tempDir, mockTime, config)
+
+ for(_ <- 0 to 1000) {
+ append(log, 1, 1)
+ }
+ log.updateHighWatermark(new LogOffsetMetadata(1001))
+
+ for(offset <- Seq(100, 200, 300, 400, 500, 600)) {
+ val snapshotId = new OffsetAndEpoch(offset, 1)
+ TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot =>
+ append(snapshot, 10)
+ snapshot.freeze()
+ }
+ }
+
+ assertEquals(6, log.snapshotCount())
+ assertTrue(log.maybeClean())
+ assertEquals(1, log.snapshotCount(), "Expected only one snapshot after
cleaning")
+ assertOptional(log.latestSnapshotId(), (snapshotId: OffsetAndEpoch) => {
+ assertEquals(600, snapshotId.offset)
+ })
+ assertEquals(log.startOffset, 600)
+ }
+
+ @Test
+ def testSoftRetentionLimit(): Unit = {
+ // Set retention equal to the segment size and generate slightly more than
one segment of logs
+ val config = DefaultMetadataLogConfig.copy(
+ logSegmentBytes = 10240,
+ logSegmentMillis = 10 * 1000,
+ retentionMaxBytes = 10240,
+ retentionMillis = 60 * 1000,
+ maxBatchSizeInBytes = 100
+ )
+ val log = buildMetadataLog(tempDir, mockTime, config)
+
+ for(_ <- 0 to 2000) {
+ append(log, 1, 1)
+ }
+ log.updateHighWatermark(new LogOffsetMetadata(2000))
+
+ // Then generate two snapshots
+ val snapshotId1 = new OffsetAndEpoch(1000, 1)
+ TestUtils.resource(log.storeSnapshot(snapshotId1).get()) { snapshot =>
+ append(snapshot, 500)
+ snapshot.freeze()
+ }
+
+ // Then generate a snapshot
+ val snapshotId2 = new OffsetAndEpoch(2000, 1)
+ TestUtils.resource(log.storeSnapshot(snapshotId2).get()) { snapshot =>
+ append(snapshot, 500)
+ snapshot.freeze()
+ }
+
+ // Cleaning should occur, but resulting size will not be under retention
limit since we have to keep one snapshot
+ assertTrue(log.maybeClean())
+ assertEquals(1, log.snapshotCount(), "Expected one snapshot after
cleaning")
+ assertOptional(log.latestSnapshotId(), (snapshotId: OffsetAndEpoch) => {
+ assertEquals(2000, snapshotId.offset, "Unexpected offset for latest
snapshot")
+ assertOptional(log.readSnapshot(snapshotId), (reader: RawSnapshotReader)
=> {
+ assertTrue(reader.sizeInBytes() + log.log.size >
config.retentionMaxBytes)
+ })
+ })
+ }
}
object KafkaMetadataLogTest {
@@ -817,12 +863,22 @@ object KafkaMetadataLogTest {
}
}
+ val DefaultMetadataLogConfig = MetadataLogConfig(
+ logSegmentBytes = 100 * 1024,
+ logSegmentMillis = 10 * 1000,
+ retentionMaxBytes = 100 * 1024,
+ retentionMillis = 60 * 1000,
+ maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+ nodeId = 1
+ )
+
def buildMetadataLogAndDir(
tempDir: File,
time: MockTime,
- maxBatchSizeInBytes: Int = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
- maxFetchSizeInBytes: Int = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
- ): (Path, KafkaMetadataLog) = {
+ metadataLogConfig: MetadataLogConfig = DefaultMetadataLogConfig
+ ): (Path, KafkaMetadataLog, MetadataLogConfig) = {
val logDir = createLogDirectory(
tempDir,
@@ -835,20 +891,18 @@ object KafkaMetadataLogTest {
logDir,
time,
time.scheduler,
- maxBatchSizeInBytes,
- maxFetchSizeInBytes
+ metadataLogConfig
)
- (logDir.toPath, metadataLog)
+ (logDir.toPath, metadataLog, metadataLogConfig)
}
def buildMetadataLog(
tempDir: File,
time: MockTime,
- maxBatchSizeInBytes: Int = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
- maxFetchSizeInBytes: Int = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ metadataLogConfig: MetadataLogConfig = DefaultMetadataLogConfig,
): KafkaMetadataLog = {
- val (_, log) = buildMetadataLogAndDir(tempDir, time, maxBatchSizeInBytes,
maxFetchSizeInBytes)
+ val (_, log, _) = buildMetadataLogAndDir(tempDir, time, metadataLogConfig)
log
}
@@ -864,6 +918,15 @@ object KafkaMetadataLogTest {
)
}
+ def append(snapshotWriter: RawSnapshotWriter, numberOfRecords: Int): Unit = {
+ snapshotWriter.append(MemoryRecords.withRecords(
+ 0,
+ CompressionType.NONE,
+ 0,
+ (0 until numberOfRecords).map(number => new
SimpleRecord(number.toString.getBytes)): _*
+ ))
+ }
+
private def createLogDirectory(logDir: File, logDirName: String): File = {
val logDirPath = logDir.getAbsolutePath
val dir = new File(logDirPath, logDirName)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 7241bf9..b555b01 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -603,6 +603,10 @@ class KafkaConfigTest {
case KafkaConfig.BrokerSessionTimeoutMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.NodeIdProp => assertPropertyInvalid(baseProperties,
name, "not_a_number")
case KafkaConfig.MetadataLogDirProp => // ignore string
+ case KafkaConfig.MetadataLogSegmentBytesProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case KafkaConfig.MetadataLogSegmentMillisProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case KafkaConfig.MetadataMaxRetentionBytesProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case KafkaConfig.MetadataMaxRetentionMillisProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.ControllerListenerNamesProp => // ignore string
case KafkaConfig.AuthorizerClassNameProp => //ignore string
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 9e53d2c..10a4488 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -160,6 +160,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private final KafkaRaftMetrics kafkaRaftMetrics;
private final QuorumState quorum;
private final RequestManager requestManager;
+ private final RaftMetadataLogCleanerManager snapshotCleaner;
private final List<ListenerContext> listenerContexts = new ArrayList<>();
private final ConcurrentLinkedQueue<Listener<T>> pendingListeners = new
ConcurrentLinkedQueue<>();
@@ -230,7 +231,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
this.logger = logContext.logger(KafkaRaftClient.class);
this.random = random;
this.raftConfig = raftConfig;
-
+ this.snapshotCleaner = new RaftMetadataLogCleanerManager(logger, time,
60000, log::maybeClean);
Set<Integer> quorumVoterIds = raftConfig.quorumVoterIds();
this.requestManager = new RequestManager(quorumVoterIds,
raftConfig.retryBackoffMs(),
raftConfig.requestTimeoutMs(), random);
@@ -2091,8 +2092,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private long pollCurrentState(long currentTimeMs) {
- maybeDeleteBeforeSnapshot();
-
if (quorum.isLeader()) {
return pollLeader(currentTimeMs);
} else if (quorum.isCandidate()) {
@@ -2155,14 +2154,34 @@ public class KafkaRaftClient<T> implements
RaftClient<T> {
return false;
}
- private void maybeDeleteBeforeSnapshot() {
- log.latestSnapshotId().ifPresent(snapshotId -> {
- quorum.highWatermark().ifPresent(highWatermark -> {
- if (highWatermark.offset >= snapshotId.offset) {
- log.deleteBeforeSnapshot(snapshotId);
+ /**
+ * A simple timer based log cleaner
+ */
+ private static class RaftMetadataLogCleanerManager {
+ private final Logger logger;
+ private final Timer timer;
+ private final long delayMs;
+ private final Runnable cleaner;
+
+ RaftMetadataLogCleanerManager(Logger logger, Time time, long delayMs,
Runnable cleaner) {
+ this.logger = logger;
+ this.timer = time.timer(delayMs);
+ this.delayMs = delayMs;
+ this.cleaner = cleaner;
+ }
+
+ public long maybeClean(long currentTimeMs) {
+ timer.update(currentTimeMs);
+ if (timer.isExpired()) {
+ try {
+ cleaner.run();
+ } catch (Throwable t) {
+ logger.error("Had an error during log cleaning", t);
}
- });
- });
+ timer.reset(delayMs);
+ }
+ return timer.remainingMs();
+ }
}
private void wakeup() {
@@ -2191,7 +2210,10 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
return;
}
- long pollTimeoutMs = pollCurrentState(currentTimeMs);
+ long pollStateTimeoutMs = pollCurrentState(currentTimeMs);
+ long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs);
+ long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);
+
kafkaRaftMetrics.updatePollStart(currentTimeMs);
RaftMessage message = messageQueue.poll(pollTimeoutMs);
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
index 01c6aa4..1b49b3a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
@@ -175,12 +175,12 @@ public interface ReplicatedLog extends AutoCloseable {
void updateHighWatermark(LogOffsetMetadata offsetMetadata);
/**
- * Updates the log start offset and delete segments if necessary.
+ * Delete all snapshots prior to the given snapshot
*
* The replicated log's start offset can be increased and older segments
can be deleted when
* there is a snapshot greater than the current log start offset.
*/
- boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId);
+ boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId);
/**
* Flush the current log to disk.
@@ -188,6 +188,11 @@ public interface ReplicatedLog extends AutoCloseable {
void flush();
/**
+ * Possibly perform cleaning of snapshots and logs
+ */
+ boolean maybeClean();
+
+ /**
* Get the last offset which has been flushed to disk.
*/
long lastFlushedOffset();
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
index ce0c7dd..a4d3b5a 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
@@ -116,7 +116,13 @@ public final class Snapshots {
Path immutablePath = snapshotPath(logDir, snapshotId);
Path deletedPath = deleteRename(immutablePath, snapshotId);
try {
- return Files.deleteIfExists(immutablePath) |
Files.deleteIfExists(deletedPath);
+ boolean deleted = Files.deleteIfExists(immutablePath) |
Files.deleteIfExists(deletedPath);
+ if (deleted) {
+ log.info("Deleted snapshot files for snapshot {}.",
snapshotId);
+ } else {
+ log.info("Did not delete snapshot files for snapshot {} since
they did not exist.", snapshotId);
+ }
+ return deleted;
} catch (IOException e) {
log.error("Error deleting snapshot files {} and {}",
immutablePath, deletedPath, e);
return false;
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index df9da9d..04b5676 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -49,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
final public class KafkaRaftClientSnapshotTest {
@Test
- public void testLeaderListernerNotified() throws Exception {
+ public void testLeaderListenerNotified() throws Exception {
int localId = 0;
int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -128,6 +128,7 @@ final public class KafkaRaftClientSnapshotTest {
.appendToLog(snapshotId.epoch, Arrays.asList("a", "b", "c"))
.appendToLog(snapshotId.epoch, Arrays.asList("d", "e", "f"))
.withEmptySnapshot(snapshotId)
+ .deleteBeforeSnapshot(snapshotId)
.withElectedLeader(epoch, leaderId)
.build();
@@ -196,6 +197,7 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals(secondSnapshotId, snapshot.snapshotId());
snapshot.freeze();
}
+ context.log.deleteBeforeSnapshot(secondSnapshotId);
context.client.poll();
// Resume the listener from reading commit batches
@@ -244,11 +246,9 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals(snapshotId, snapshot.snapshotId());
snapshot.freeze();
}
-
+ context.log.deleteBeforeSnapshot(snapshotId);
context.client.poll();
- assertEquals(snapshotId.offset, context.log.startOffset());
-
// Send Fetch request less than start offset
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0,
epoch, 0));
context.pollUntilResponse();
@@ -425,6 +425,7 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals(oldestSnapshotId, snapshot.snapshotId());
snapshot.freeze();
}
+ context.log.deleteBeforeSnapshot(oldestSnapshotId);
context.client.poll();
// Send fetch with log start offset and invalid last fetched epoch
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 3563016..fb2168b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -317,6 +317,11 @@ public class MockLog implements ReplicatedLog {
}
@Override
+ public boolean maybeClean() {
+ return false;
+ }
+
+ @Override
public long lastFlushedOffset() {
return lastFlushedOffset;
}