This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 6624d070348 KAFKA-15375: fix broken clean shutdown detection logic in
LogManager
6624d070348 is described below
commit 6624d070348fba764d9cefd4f110d98fe8944d8d
Author: Vincent Jiang <[email protected]>
AuthorDate: Wed Aug 30 09:19:24 2023 -0700
KAFKA-15375: fix broken clean shutdown detection logic in LogManager
When running in kraft mode, LogManager.startup is called in a different
thread than the main broker (#14239)
startup thread (by BrokerMetadataPublisher when the first metadata update
is received.) If a fatal
error happens during broker startup, before LogManager.startup is
completed, LogManager.shutdown may
mark log dirs as clean shutdown improperly.
This PR includes following change:
1. During LogManager startup time:
- track hadCleanShutdwon info for each log dir
- track loadLogsCompleted status for each log dir
2. During LogManager shutdown time:
- do not write clean shutdown marker file for log dirs which have
hadCleanShutdown==false and loadLogsCompleted==false
Reviewers: Colin P. McCabe <[email protected]>
---
core/src/main/scala/kafka/log/LogManager.scala | 29 ++++++++++--
.../test/scala/unit/kafka/log/LogManagerTest.scala | 53 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index d31046092f3..52de8cb74a2 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -125,6 +125,12 @@ class LogManager(logDirs: Seq[File],
logDirsSet
}
+ // A map that stores hadCleanShutdown flag for each log dir.
+ private val hadCleanShutdownFlags = new ConcurrentHashMap[String, Boolean]()
+
+ // A map that tells whether all logs in a log dir had been loaded or not at
startup time.
+ private val loadLogsCompletedFlags = new ConcurrentHashMap[String, Boolean]()
+
@volatile private var _cleaner: LogCleaner = _
private[kafka] def cleaner: LogCleaner = _cleaner
@@ -369,6 +375,7 @@ class LogManager(logDirs: Seq[File],
// log recovery itself is being performed by `Log` class during
initialization
info(s"Attempting recovery for all logs in $logDirAbsolutePath since
no clean shutdown file was found")
}
+ hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown)
var recoveryPoints = Map[TopicPartition, Long]()
try {
@@ -391,7 +398,8 @@ class LogManager(logDirs: Seq[File],
val logsToLoad =
Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
logDir.isDirectory &&
UnifiedLog.parseTopicPartitionName(logDir).topic !=
KafkaRaftServer.MetadataTopic)
numTotalLogs += logsToLoad.length
- numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
+ numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length)
+ loadLogsCompletedFlags.put(logDirAbsolutePath, logsToLoad.isEmpty)
val jobsForDir = logsToLoad.map { logDir =>
val runnable: Runnable = () => {
@@ -409,13 +417,18 @@ class LogManager(logDirs: Seq[File],
// And while converting IOException to KafkaStorageException,
we've already handled the exception. So we can ignore it here.
} finally {
val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
- val remainingLogs = decNumRemainingLogs(numRemainingLogs,
dir.getAbsolutePath)
+ val remainingLogs = decNumRemainingLogs(numRemainingLogs,
logDirAbsolutePath)
val currentNumLoaded = logsToLoad.length - remainingLogs
log match {
case Some(loadedLog) => info(s"Completed load of $loadedLog
with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " +
s"($currentNumLoaded/${logsToLoad.length} completed in
$logDirAbsolutePath)")
case None => info(s"Error while loading logs in $logDir in
${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in
$logDirAbsolutePath)")
}
+
+ if (remainingLogs == 0) {
+ // loadLog is completed for all logs under the logDdir, mark
it.
+ loadLogsCompletedFlags.put(logDirAbsolutePath, true)
+ }
}
}
runnable
@@ -612,9 +625,15 @@ class LogManager(logDirs: Seq[File],
debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir, logs)
- // mark that the shutdown was clean by creating marker file
- debug(s"Writing clean shutdown marker at $dir")
- CoreUtils.swallow(Files.createFile(new File(dir,
LogLoader.CleanShutdownFile).toPath), this)
+ // mark that the shutdown was clean by creating marker file for log
dirs that:
+ // 1. had clean shutdown marker file; or
+ // 2. had no clean shutdown marker file, but all logs under it have
been recovered at startup time
+ val logDirAbsolutePath = dir.getAbsolutePath
+ if (hadCleanShutdownFlags.getOrDefault(logDirAbsolutePath, false) ||
+ loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) {
+ debug(s"Writing clean shutdown marker at $dir")
+ CoreUtils.swallow(Files.createFile(new File(dir,
LogLoader.CleanShutdownFile).toPath), this)
+ }
}
}
} finally {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a7798418ed8..dc185fd1c0a 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -134,6 +134,59 @@ class LogManagerTest {
}
}
+ /*
+ * Test that LogManager.shutdown() doesn't create clean shutdown file for a
log directory that has not completed
+ * recovery.
+ */
+ @Test
+ def testCleanShutdownFileWhenShutdownCalledBeforeStartupComplete(): Unit = {
+ // 1. create two logs under logDir
+ val topicPartition0 = new TopicPartition(name, 0)
+ val topicPartition1 = new TopicPartition(name, 1)
+ val log0 = logManager.getOrCreateLog(topicPartition0, topicId = None)
+ val log1 = logManager.getOrCreateLog(topicPartition1, topicId = None)
+ val logFile0 = new File(logDir, name + "-0")
+ val logFile1 = new File(logDir, name + "-1")
+ assertTrue(logFile0.exists)
+ assertTrue(logFile1.exists)
+
+ log0.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()),
leaderEpoch = 0)
+ log0.takeProducerSnapshot()
+
+ log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()),
leaderEpoch = 0)
+ log1.takeProducerSnapshot()
+
+ // 2. simulate unclean shutdown by deleting clean shutdown marker file
+ logManager.shutdown()
+ assertTrue(Files.deleteIfExists(new File(logDir,
LogLoader.CleanShutdownFile).toPath))
+
+ // 3. create a new LogManager and start it in a different thread
+ @volatile var loadLogCalled = 0
+ logManager = spy(createLogManager())
+ doAnswer { invocation =>
+ // intercept LogManager.loadLog to sleep 5 seconds so that there is
enough time to call LogManager.shutdown
+ // before LogManager.startup completes.
+ Thread.sleep(5000)
+ invocation.callRealMethod().asInstanceOf[UnifiedLog]
+ loadLogCalled = loadLogCalled + 1
+ }.when(logManager).loadLog(any[File], any[Boolean],
any[Map[TopicPartition, Long]], any[Map[TopicPartition, Long]],
+ any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String,
Int]])
+
+ val t = new Thread() {
+ override def run(): Unit = { logManager.startup(Set.empty) }
+ }
+ t.start()
+
+ // 4. shutdown LogManager after the first log is loaded but before the
second log is loaded
+ TestUtils.waitUntilTrue(() => loadLogCalled == 1,
+ "Timed out waiting for only the first log to be loaded")
+ logManager.shutdown()
+ logManager = null
+
+ // 5. verify that CleanShutdownFile is not created under logDir
+ assertFalse(Files.exists(new File(logDir,
LogLoader.CleanShutdownFile).toPath))
+ }
+
/**
* Test that getOrCreateLog on a non-existent log creates a new log and that
we can append to the new log.
* The LogManager is configured with one invalid log directory which should
be marked as offline.