This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 0c08c80afa KAFKA-14240; Validate kraft snapshot state on startup
(#12653)
0c08c80afa is described below
commit 0c08c80afa8a00aa520ec407d9bce6537395fc23
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon Sep 19 11:52:48 2022 -0700
KAFKA-14240; Validate kraft snapshot state on startup (#12653)
We should prevent the metadata log from initializing in a known bad state.
If the log start offset of the first segment is greater than 0, then must be a
snapshot an offset greater than or equal to it order to ensure that the
initialized state is complete.
Reviewers: José Armando García Sancio <[email protected]>
---
core/src/main/scala/kafka/log/LogLoader.scala | 7 +-
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 43 ++++++++---
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 85 +++++++++++++++++++++-
.../java/org/apache/kafka/snapshot/Snapshots.java | 15 ++--
.../org/apache/kafka/snapshot/SnapshotsTest.java | 6 +-
5 files changed, 132 insertions(+), 24 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala
b/core/src/main/scala/kafka/log/LogLoader.scala
index 25ee89c72b..f8da67656f 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -19,7 +19,6 @@ package kafka.log
import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
-
import kafka.common.LogSegmentOffsetOverflowException
import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix,
SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
import kafka.server.{LogDirFailureChannel, LogOffsetMetadata}
@@ -28,6 +27,7 @@ import kafka.utils.{CoreUtils, Logging, Scheduler}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.snapshot.Snapshots
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.collection.{Set, mutable}
@@ -229,7 +229,10 @@ class LogLoader(
if (!file.canRead)
throw new IOException(s"Could not read file $file")
val filename = file.getName
- if (filename.endsWith(DeletedFileSuffix)) {
+
+ // Delete stray files marked for deletion, but skip KRaft snapshots.
+ // These are handled in the recovery logic in `KafkaMetadataLog`.
+ if (filename.endsWith(DeletedFileSuffix) &&
!filename.endsWith(Snapshots.DELETE_SUFFIX)) {
debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
Files.deleteIfExists(file.toPath)
} else if (filename.endsWith(CleanedFileSuffix)) {
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index d112f3b581..95d96b3399 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -16,7 +16,7 @@
*/
package kafka.raft
-import kafka.log.{AppendOrigin, Defaults, UnifiedLog, LogConfig,
LogOffsetSnapshot, SnapshotGenerated}
+import kafka.log.{AppendOrigin, Defaults, LogConfig, LogOffsetSnapshot,
SnapshotGenerated, UnifiedLog}
import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp,
MetadataLogSegmentMinBytesProp}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd,
KafkaConfig, LogDirFailureChannel, RequestLocal}
import kafka.utils.{CoreUtils, Logging, Scheduler}
@@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils,
MemoryRecords, Record
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo,
LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog,
ValidOffsetAndEpoch}
-import org.apache.kafka.snapshot.{FileRawSnapshotReader,
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, Snapshots}
+import org.apache.kafka.snapshot.{FileRawSnapshotReader,
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath,
Snapshots}
import java.io.File
import java.nio.file.{Files, NoSuchFileException, Path}
@@ -546,7 +546,7 @@ case class MetadataLogConfig(logSegmentBytes: Int,
fileDeleteDelayMs: Int,
nodeId: Int)
-object KafkaMetadataLog {
+object KafkaMetadataLog extends Logging {
def apply(
topicPartition: TopicPartition,
topicId: Uuid,
@@ -623,7 +623,9 @@ object KafkaMetadataLog {
private def recoverSnapshots(
log: UnifiedLog
): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
- val snapshots = mutable.TreeMap.empty[OffsetAndEpoch,
Option[FileRawSnapshotReader]]
+ val snapshotsToRetain = mutable.TreeMap.empty[OffsetAndEpoch,
Option[FileRawSnapshotReader]]
+ val snapshotsToDelete = mutable.Buffer.empty[SnapshotPath]
+
// Scan the log directory; deleting partial snapshots and older snapshot,
only remembering immutable snapshots start
// from logStartOffset
val filesInDir = Files.newDirectoryStream(log.dir.toPath)
@@ -631,21 +633,40 @@ object KafkaMetadataLog {
try {
filesInDir.forEach { path =>
Snapshots.parse(path).ifPresent { snapshotPath =>
- if (snapshotPath.partial ||
- snapshotPath.deleted ||
- snapshotPath.snapshotId.offset < log.logStartOffset) {
- // Delete partial snapshot, deleted snapshot and older snapshot
- Files.deleteIfExists(snapshotPath.path)
+ // Collect partial snapshot, deleted snapshot and older snapshot for
deletion
+ if (snapshotPath.partial
+ || snapshotPath.deleted
+ || snapshotPath.snapshotId.offset < log.logStartOffset) {
+ snapshotsToDelete.append(snapshotPath)
} else {
- snapshots.put(snapshotPath.snapshotId, None)
+ snapshotsToRetain.put(snapshotPath.snapshotId, None)
}
}
}
+
+ // Before deleting any snapshots, we should ensure that the retained
snapshots are
+ // consistent with the current state of the log. If the log start offset
is not 0,
+ // then we must have a snapshot which covers the initial state up to the
current
+ // log start offset.
+ if (log.logStartOffset > 0) {
+ val latestSnapshotId = snapshotsToRetain.lastOption.map(_._1)
+ if (!latestSnapshotId.exists(snapshotId => snapshotId.offset >=
log.logStartOffset)) {
+ throw new IllegalStateException("Inconsistent snapshot state: there
must be a snapshot " +
+ s"at an offset larger then the current log start offset
${log.logStartOffset}, but the " +
+ s"latest snapshot is $latestSnapshotId")
+ }
+ }
+
+ snapshotsToDelete.foreach { snapshotPath =>
+ Files.deleteIfExists(snapshotPath.path)
+ info(s"Deleted unneeded snapshot file with path $snapshotPath")
+ }
} finally {
filesInDir.close()
}
- snapshots
+ info(s"Initialized snapshots with IDs ${snapshotsToRetain.keys} from
${log.dir}")
+ snapshotsToRetain
}
private def deleteSnapshotFiles(
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 49b1206606..fa9885e358 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.raft
-import kafka.log.{Defaults, UnifiedLog, SegmentDeletion}
+import kafka.log.{Defaults, SegmentDeletion, UnifiedLog}
import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp,
MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp,
ProcessRolesProp, QuorumVotersProp}
import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.utils.{MockTime, TestUtils}
@@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.internals.BatchBuilder
import org.apache.kafka.raft._
import org.apache.kafka.server.common.serialization.RecordSerde
-import org.apache.kafka.snapshot.{RawSnapshotReader, RawSnapshotWriter,
SnapshotPath, Snapshots}
+import org.apache.kafka.snapshot.{FileRawSnapshotWriter, RawSnapshotReader,
RawSnapshotWriter, SnapshotPath, Snapshots}
import org.apache.kafka.test.TestUtils.assertOptional
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -391,6 +391,87 @@ final class KafkaMetadataLogTest {
}
}
+ @Test
+ def testStartupWithInvalidSnapshotState(): Unit = {
+ // Initialize an empty log at offset 100.
+ var log = buildMetadataLog(tempDir, mockTime)
+ log.log.truncateFullyAndStartAt(newOffset = 100)
+ log.close()
+
+ val metadataDir = metadataLogDir(tempDir)
+ assertTrue(metadataDir.exists())
+
+ // Initialization should fail unless we have a snapshot at an offset
+ // greater than or equal to 100.
+ assertThrows(classOf[IllegalStateException], () => {
+ buildMetadataLog(tempDir, mockTime)
+ })
+ // Snapshots at offsets less than 100 are not sufficient.
+ writeEmptySnapshot(metadataDir, new OffsetAndEpoch(50, 1))
+ assertThrows(classOf[IllegalStateException], () => {
+ buildMetadataLog(tempDir, mockTime)
+ })
+
+ // Snapshot at offset 100 should be fine.
+ writeEmptySnapshot(metadataDir, new OffsetAndEpoch(100, 1))
+ log = buildMetadataLog(tempDir, mockTime)
+ log.log.truncateFullyAndStartAt(newOffset = 200)
+ log.close()
+
+ // Snapshots at higher offsets are also fine. In this case, the
+ // log start offset should advance to the first snapshot offset.
+ writeEmptySnapshot(metadataDir, new OffsetAndEpoch(500, 1))
+ log = buildMetadataLog(tempDir, mockTime)
+ assertEquals(500, log.log.logStartOffset)
+ }
+
+ @Test
+ def testSnapshotDeletionWithInvalidSnapshotState(): Unit = {
+ // Initialize an empty log at offset 100.
+ val log = buildMetadataLog(tempDir, mockTime)
+ log.log.truncateFullyAndStartAt(newOffset = 100)
+ log.close()
+
+ val metadataDir = metadataLogDir(tempDir)
+ assertTrue(metadataDir.exists())
+
+ // We have one deleted snapshot at an offset matching the start offset.
+ val snapshotId = new OffsetAndEpoch(100, 1)
+ writeEmptySnapshot(metadataDir, snapshotId)
+
+ val deletedPath = Snapshots.markForDelete(metadataDir.toPath, snapshotId)
+ assertTrue(deletedPath.toFile.exists())
+
+ // Initialization should still fail.
+ assertThrows(classOf[IllegalStateException], () => {
+ buildMetadataLog(tempDir, mockTime)
+ })
+
+ // The snapshot marked for deletion should still exist.
+ assertTrue(deletedPath.toFile.exists())
+ }
+
+ private def metadataLogDir(
+ logDir: File
+ ): File = {
+ new File(
+ logDir.getAbsolutePath,
+ UnifiedLog.logDirName(KafkaRaftServer.MetadataPartition)
+ )
+ }
+
+ private def writeEmptySnapshot(
+ metadataDir: File,
+ snapshotId: OffsetAndEpoch
+ ): Unit = {
+ val writer = FileRawSnapshotWriter.create(
+ metadataDir.toPath,
+ snapshotId,
+ Optional.empty()
+ )
+ TestUtils.resource(writer)(_.freeze())
+ }
+
@Test
def testDoesntTruncateFully(): Unit = {
val log = buildMetadataLog(tempDir, mockTime)
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
index 337e56a7f8..a41f6485fa 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
@@ -32,7 +32,7 @@ public final class Snapshots {
private static final Logger log = LoggerFactory.getLogger(Snapshots.class);
public static final String SUFFIX = ".checkpoint";
private static final String PARTIAL_SUFFIX = String.format("%s.part",
SUFFIX);
- private static final String DELETE_SUFFIX = String.format("%s.deleted",
SUFFIX);
+ public static final String DELETE_SUFFIX = String.format("%s.deleted",
SUFFIX);
private static final NumberFormat OFFSET_FORMATTER =
NumberFormat.getInstance();
private static final NumberFormat EPOCH_FORMATTER =
NumberFormat.getInstance();
@@ -60,7 +60,7 @@ public final class Snapshots {
return source.resolveSibling(filenameFromSnapshotId(snapshotId) +
SUFFIX);
}
- static Path deleteRename(Path source, OffsetAndEpoch snapshotId) {
+ static Path deleteRenamePath(Path source, OffsetAndEpoch snapshotId) {
return source.resolveSibling(filenameFromSnapshotId(snapshotId) +
DELETE_SUFFIX);
}
@@ -114,7 +114,7 @@ public final class Snapshots {
*/
public static boolean deleteIfExists(Path logDir, OffsetAndEpoch
snapshotId) {
Path immutablePath = snapshotPath(logDir, snapshotId);
- Path deletedPath = deleteRename(immutablePath, snapshotId);
+ Path deletedPath = deleteRenamePath(immutablePath, snapshotId);
try {
boolean deleted = Files.deleteIfExists(immutablePath) |
Files.deleteIfExists(deletedPath);
if (deleted) {
@@ -130,13 +130,16 @@ public final class Snapshots {
}
/**
- * Mark a snapshot for deletion by renaming with the deleted suffix
+ * Mark a snapshot for deletion by renaming with the deleted suffix.
+ *
+ * @return the path of the snapshot marked for deletion (i.e. with .delete
suffix)
*/
- public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+ public static Path markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
Path immutablePath = snapshotPath(logDir, snapshotId);
- Path deletedPath = deleteRename(immutablePath, snapshotId);
+ Path deletedPath = deleteRenamePath(immutablePath, snapshotId);
try {
Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
+ return deletedPath;
} catch (IOException e) {
throw new UncheckedIOException(
String.format(
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
index ae89543e3d..57aca28220 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
@@ -75,7 +75,7 @@ final public class SnapshotsTest {
TestUtils.RANDOM.nextInt(Integer.MAX_VALUE)
);
Path path = Snapshots.snapshotPath(TestUtils.tempDirectory().toPath(),
snapshotId);
- Path deletedPath = Snapshots.deleteRename(path, snapshotId);
+ Path deletedPath = Snapshots.deleteRenamePath(path, snapshotId);
SnapshotPath snapshotPath = Snapshots.parse(deletedPath).get();
assertEquals(snapshotId, snapshotPath.snapshotId);
@@ -116,11 +116,11 @@ final public class SnapshotsTest {
if (renameBeforeDeleting)
// rename snapshot before deleting
- Utils.atomicMoveWithFallback(snapshotPath,
Snapshots.deleteRename(snapshotPath, snapshotId), false);
+ Utils.atomicMoveWithFallback(snapshotPath,
Snapshots.deleteRenamePath(snapshotPath, snapshotId), false);
assertTrue(Snapshots.deleteIfExists(logDirPath,
snapshot.snapshotId()));
assertFalse(Files.exists(snapshotPath));
- assertFalse(Files.exists(Snapshots.deleteRename(snapshotPath,
snapshotId)));
+ assertFalse(Files.exists(Snapshots.deleteRenamePath(snapshotPath,
snapshotId)));
}
}
}