This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 5002715 KAFKA-10101: Fix edge cases in Log.recoverLog and
LogManager.loadLogs (#8812)
5002715 is described below
commit 5002715485482a8bffd04c05110a29ca98ab097c
Author: Ismael Juma <[email protected]>
AuthorDate: Fri Feb 26 14:40:46 2021 -0800
KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs
(#8812)
1. Don't advance recovery point in `recoverLog` unless there was a clean
shutdown.
2. Ensure the recovery point is not ahead of the log end offset.
3. Clean and flush leader epoch cache and truncate produce state manager
if deleting segments due to log end offset being smaller than log start
offset.
4. If we are unable to delete clean shutdown file that exists, mark the
directory as offline (this was the intent, but the code was wrong).
Updated one test that was failing after this change to verify the new
behavior.
Reviewers: Jun Rao <[email protected]>, Jason Gustafson <[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 46 +++++++++++++++++-------
core/src/main/scala/kafka/log/LogManager.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 6 +++-
3 files changed, 39 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 2249b5e..2729154 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -849,9 +849,25 @@ class Log(@volatile private var _dir: File,
* @throws LogSegmentOffsetOverflowException if we encountered a legacy
segment with offset overflow
*/
private[log] def recoverLog(): Long = {
+ /** return the log end offset if valid */
+ def deleteSegmentsIfLogStartGreaterThanLogEnd(): Option[Long] = {
+ if (logSegments.nonEmpty) {
+ val logEndOffset = activeSegment.readNextOffset
+ if (logEndOffset >= logStartOffset)
+ Some(logEndOffset)
+ else {
+ warn(s"Deleting all segments because logEndOffset ($logEndOffset) is
smaller than logStartOffset ($logStartOffset). " +
+ "This could happen if segment files were deleted from the file
system.")
+ removeAndDeleteSegments(logSegments, asyncDelete = true, LogRecovery)
+ leaderEpochCache.foreach(_.clearAndFlush())
+ producerStateManager.truncateFullyAndStartAt(logStartOffset)
+ None
+ }
+ } else None
+ }
+
// if we have the clean shutdown marker, skip recovery
if (!hadCleanShutdown) {
- // okay we need to actually recover this log
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
var truncated = false
@@ -879,16 +895,7 @@ class Log(@volatile private var _dir: File,
}
}
- if (logSegments.nonEmpty) {
- val logEndOffset = activeSegment.readNextOffset
- if (logEndOffset < logStartOffset) {
- warn(s"Deleting all segments because logEndOffset ($logEndOffset) is
smaller than logStartOffset ($logStartOffset). " +
- "This could happen if segment files were deleted from the file
system.")
- removeAndDeleteSegments(logSegments,
- asyncDelete = true,
- reason = LogRecovery)
- }
- }
+ val logEndOffsetOption = deleteSegmentsIfLogStartGreaterThanLogEnd()
if (logSegments.isEmpty) {
// no existing segments, create a new mutable segment beginning at
logStartOffset
@@ -900,8 +907,21 @@ class Log(@volatile private var _dir: File,
preallocate = config.preallocate))
}
- recoveryPoint = activeSegment.readNextOffset
- recoveryPoint
+ // Update the recovery point if there was a clean shutdown and did not
perform any changes to
+ // the segment. Otherwise, we just ensure that the recovery point is not
ahead of the log end
+ // offset. To ensure correctness and to make it easier to reason about,
it's best to only advance
+ // the recovery point in flush(Long). If we advanced the recovery point
here, we could skip recovery for
+ // unflushed segments if the broker crashed after we checkpoint the
recovery point and before we flush the
+ // segment.
+ (hadCleanShutdown, logEndOffsetOption) match {
+ case (true, Some(logEndOffset)) =>
+ recoveryPoint = logEndOffset
+ logEndOffset
+ case _ =>
+ val logEndOffset =
logEndOffsetOption.getOrElse(activeSegment.readNextOffset)
+ recoveryPoint = Math.min(recoveryPoint, logEndOffset)
+ logEndOffset
+ }
}
// Rebuild producer state until lastOffset. This method may be called from
the recovery code path, and thus must be
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index acb9d34..1ca4d7e 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -318,7 +318,7 @@ class LogManager(logDirs: Seq[File],
info(s"Skipping recovery for all logs in $logDirAbsolutePath since
clean shutdown file was found")
// Cache the clean shutdown status and use that for rest of log
loading workflow. Delete the CleanShutdownFile
// so that if broker crashes while loading the log, it is considered
hard shutdown during the next boot up. KAFKA-10471
- cleanShutdownFile.delete()
+ Files.deleteIfExists(cleanShutdownFile.toPath)
hadCleanShutdown = true
} else {
// log recovery itself is being performed by `Log` class during
initialization
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1a953c5..66ee5d2 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2518,7 +2518,11 @@ class LogTest {
log.close()
// test recovery case
- log = createLog(logDir, logConfig, lastShutdownClean = false)
+ val recoveryPoint = 10
+ log = createLog(logDir, logConfig, recoveryPoint = recoveryPoint,
lastShutdownClean = false)
+ // the recovery point should not be updated after unclean shutdown until
the log is flushed
+ verifyRecoveredLog(log, recoveryPoint)
+ log.flush()
verifyRecoveredLog(log, lastOffset)
log.close()
}