This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9584b48a2a8 MINOR: Better logging to distinguish clean vs unclean
loading times (#13242)
9584b48a2a8 is described below
commit 9584b48a2a873c692d5054dd06b66232dc25e080
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Feb 14 09:15:05 2023 -0800
MINOR: Better logging to distinguish clean vs unclean loading times (#13242)
Current log loading logging makes it difficult to analyze the behavior in
the case of clean and unclean shutdown. The log message looks the same either
way. Additionally, the logging about unclean recovery also catches the case
when a new broker is initializing from an empty log dir. This patch adds some
additional information to existing log messages to make it easier to
distinguish these cases.
Reviewers: David Jacot <[email protected]>
---
core/src/main/scala/kafka/log/LogManager.scala | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 6b2704d0fc1..d2b9ec56f08 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -354,6 +354,7 @@ class LogManager(logDirs: Seq[File],
error(s"Error while loading log dir $logDirAbsolutePath", e)
}
+ val uncleanLogDirs = mutable.Buffer.empty[String]
for (dir <- liveLogDirs) {
val logDirAbsolutePath = dir.getAbsolutePath
var hadCleanShutdown: Boolean = false
@@ -364,14 +365,10 @@ class LogManager(logDirs: Seq[File],
val cleanShutdownFile = new File(dir, LogLoader.CleanShutdownFile)
if (cleanShutdownFile.exists) {
- info(s"Skipping recovery for all logs in $logDirAbsolutePath since
clean shutdown file was found")
// Cache the clean shutdown status and use that for rest of log
loading workflow. Delete the CleanShutdownFile
// so that if broker crashes while loading the log, it is considered
hard shutdown during the next boot up. KAFKA-10471
Files.deleteIfExists(cleanShutdownFile.toPath)
hadCleanShutdown = true
- } else {
- // log recovery itself is being performed by `Log` class during
initialization
- info(s"Attempting recovery for all logs in $logDirAbsolutePath since
no clean shutdown file was found")
}
var recoveryPoints = Map[TopicPartition, Long]()
@@ -401,6 +398,17 @@ class LogManager(logDirs: Seq[File],
numTotalLogs += logsToLoad.length
numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
+ if (logsToLoad.isEmpty) {
+ info(s"No logs found to be loaded in $logDirAbsolutePath")
+ } else if (hadCleanShutdown) {
+ info(s"Skipping recovery of ${logsToLoad.length} logs from
$logDirAbsolutePath since " +
+ "clean shutdown file was found")
+ } else {
+ info(s"Recovering ${logsToLoad.length} logs from $logDirAbsolutePath
since no " +
+ "clean shutdown file was found")
+ uncleanLogDirs.append(logDirAbsolutePath)
+ }
+
val jobsForDir = logsToLoad.map { logDir =>
val runnable: Runnable = () => {
debug(s"Loading log $logDir")
@@ -454,7 +462,9 @@ class LogManager(logDirs: Seq[File],
threadPools.foreach(_.shutdown())
}
- info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
+ val elapsedMs = time.hiResClockMs() - startMs
+ val printedUncleanLogDirs = if (uncleanLogDirs.isEmpty) "" else s"
(unclean log dirs = $uncleanLogDirs)"
+ info(s"Loaded $numTotalLogs logs in ${elapsedMs}ms$printedUncleanLogDirs")
}
private[log] def addLogRecoveryMetrics(numRemainingLogs:
ConcurrentMap[String, Int],