This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 5002715  KAFKA-10101: Fix edge cases in Log.recoverLog and 
LogManager.loadLogs (#8812)
5002715 is described below

commit 5002715485482a8bffd04c05110a29ca98ab097c
Author: Ismael Juma <[email protected]>
AuthorDate: Fri Feb 26 14:40:46 2021 -0800

    KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs 
(#8812)
    
    1. Don't advance recovery point in `recoverLog` unless there was a clean
    shutdown.
    2. Ensure the recovery point is not ahead of the log end offset.
    3. Clean and flush leader epoch cache and truncate produce state manager
    if deleting segments due to log end offset being smaller than log start
    offset.
    4. If we are unable to delete clean shutdown file that exists, mark the
    directory as offline (this was the intent, but the code was wrong).
    
    Updated one test that was failing after this change to verify the new 
behavior.
    
    Reviewers: Jun Rao <[email protected]>, Jason Gustafson <[email protected]>
---
 core/src/main/scala/kafka/log/Log.scala          | 46 +++++++++++++++++-------
 core/src/main/scala/kafka/log/LogManager.scala   |  2 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala |  6 +++-
 3 files changed, 39 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 2249b5e..2729154 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -849,9 +849,25 @@ class Log(@volatile private var _dir: File,
    * @throws LogSegmentOffsetOverflowException if we encountered a legacy 
segment with offset overflow
    */
   private[log] def recoverLog(): Long = {
+    /** return the log end offset if valid */
+    def deleteSegmentsIfLogStartGreaterThanLogEnd(): Option[Long] = {
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +
+            "This could happen if segment files were deleted from the file 
system.")
+          removeAndDeleteSegments(logSegments, asyncDelete = true, LogRecovery)
+          leaderEpochCache.foreach(_.clearAndFlush())
+          producerStateManager.truncateFullyAndStartAt(logStartOffset)
+          None
+        }
+      } else None
+    }
+
     // if we have the clean shutdown marker, skip recovery
     if (!hadCleanShutdown) {
-      // okay we need to actually recover this log
       val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
       var truncated = false
 
@@ -879,16 +895,7 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file 
system.")
-        removeAndDeleteSegments(logSegments,
-          asyncDelete = true,
-          reason = LogRecovery)
-      }
-    }
+    val logEndOffsetOption = deleteSegmentsIfLogStartGreaterThanLogEnd()
 
     if (logSegments.isEmpty) {
       // no existing segments, create a new mutable segment beginning at 
logStartOffset
@@ -900,8 +907,21 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset
-    recoveryPoint
+    // Update the recovery point if there was a clean shutdown and did not 
perform any changes to
+    // the segment. Otherwise, we just ensure that the recovery point is not 
ahead of the log end
+    // offset. To ensure correctness and to make it easier to reason about, 
it's best to only advance
+    // the recovery point in flush(Long). If we advanced the recovery point 
here, we could skip recovery for
+    // unflushed segments if the broker crashed after we checkpoint the 
recovery point and before we flush the
+    // segment.
+    (hadCleanShutdown, logEndOffsetOption) match {
+      case (true, Some(logEndOffset)) =>
+        recoveryPoint = logEndOffset
+        logEndOffset
+      case _ =>
+        val logEndOffset = 
logEndOffsetOption.getOrElse(activeSegment.readNextOffset)
+        recoveryPoint = Math.min(recoveryPoint, logEndOffset)
+        logEndOffset
+    }
   }
 
   // Rebuild producer state until lastOffset. This method may be called from 
the recovery code path, and thus must be
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index acb9d34..1ca4d7e 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -318,7 +318,7 @@ class LogManager(logDirs: Seq[File],
           info(s"Skipping recovery for all logs in $logDirAbsolutePath since 
clean shutdown file was found")
           // Cache the clean shutdown status and use that for rest of log 
loading workflow. Delete the CleanShutdownFile
           // so that if broker crashes while loading the log, it is considered 
hard shutdown during the next boot up. KAFKA-10471
-          cleanShutdownFile.delete()
+          Files.deleteIfExists(cleanShutdownFile.toPath)
           hadCleanShutdown = true
         } else {
           // log recovery itself is being performed by `Log` class during 
initialization
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1a953c5..66ee5d2 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2518,7 +2518,11 @@ class LogTest {
     log.close()
 
     // test recovery case
-    log = createLog(logDir, logConfig, lastShutdownClean = false)
+    val recoveryPoint = 10
+    log = createLog(logDir, logConfig, recoveryPoint = recoveryPoint, 
lastShutdownClean = false)
+    // the recovery point should not be updated after unclean shutdown until 
the log is flushed
+    verifyRecoveredLog(log, recoveryPoint)
+    log.flush()
     verifyRecoveredLog(log, lastOffset)
     log.close()
   }

Reply via email to