jsancio commented on code in PR #12653:
URL: https://github.com/apache/kafka/pull/12653#discussion_r973463543


##########
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##########
@@ -609,29 +609,48 @@ object KafkaMetadataLog {
   private def recoverSnapshots(
     log: UnifiedLog
   ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
-    val snapshots = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+    val snapshotsToRetain = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+    val snapshotsToDelete = mutable.Buffer.empty[SnapshotPath]
+
     // Scan the log directory; deleting partial snapshots and older snapshot, 
only remembering immutable snapshots start
     // from logStartOffset
     val filesInDir = Files.newDirectoryStream(log.dir.toPath)
 
     try {
       filesInDir.forEach { path =>
         Snapshots.parse(path).ifPresent { snapshotPath =>
-          if (snapshotPath.partial ||
-            snapshotPath.deleted ||
-            snapshotPath.snapshotId.offset < log.logStartOffset) {
-            // Delete partial snapshot, deleted snapshot and older snapshot
-            Files.deleteIfExists(snapshotPath.path)
+          // Collect partial snapshot, deleted snapshot and older snapshot for 
deletion
+          if (snapshotPath.partial
+            || snapshotPath.deleted
+            || snapshotPath.snapshotId.offset < log.logStartOffset) {
+            snapshotsToDelete.append(snapshotPath)
           } else {
-            snapshots.put(snapshotPath.snapshotId, None)
+            snapshotsToRetain.put(snapshotPath.snapshotId, None)
           }
         }
       }
+
+      // Before deleting any snapshots, we should ensure that the retained 
snapshots are
+      // consistent with the current state of the log. If the log start offset 
is not 0,
+      // then we must have a snapshot which covers the initial state up to the 
current
+      // log start offset.
+      if (log.logStartOffset > 0) {
+        val latestSnapshotId = snapshotsToRetain.lastOption.map(_._1)
+        if (!latestSnapshotId.exists(snapshotId => snapshotId.offset >= 
log.logStartOffset)) {
+          throw new IllegalStateException("Inconsistent snapshot state: there 
must be a snapshot " +
+            s"at an offset larger then the current log start offset 
${log.logStartOffset}, but the " +
+            s"latest snapshot is $latestSnapshotId")
+        }
+      }
+
+      snapshotsToDelete.foreach { snapshotPath =>
+        Files.deleteIfExists(snapshotPath.path)

Review Comment:
   Can we add an `INFO` level log message for these snapshots that got deleted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to