This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9584b48a2a8 MINOR: Better logging to distinguish clean vs unclean 
loading times (#13242)
9584b48a2a8 is described below

commit 9584b48a2a873c692d5054dd06b66232dc25e080
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Feb 14 09:15:05 2023 -0800

    MINOR: Better logging to distinguish clean vs unclean loading times (#13242)
    
    Current log loading logging makes it difficult to analyze the behavior in 
the case of clean and unclean shutdown. The log message looks the same either 
way. Additionally, the logging about unclean recovery also catches the case 
when a new broker is initializing from an empty log dir. This patch adds some 
additional information to existing log messages to make it easier to 
distinguish these cases.
    
    Reviewers: David Jacot <[email protected]>
---
 core/src/main/scala/kafka/log/LogManager.scala | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 6b2704d0fc1..d2b9ec56f08 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -354,6 +354,7 @@ class LogManager(logDirs: Seq[File],
       error(s"Error while loading log dir $logDirAbsolutePath", e)
     }
 
+    val uncleanLogDirs = mutable.Buffer.empty[String]
     for (dir <- liveLogDirs) {
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
@@ -364,14 +365,10 @@ class LogManager(logDirs: Seq[File],
 
         val cleanShutdownFile = new File(dir, LogLoader.CleanShutdownFile)
         if (cleanShutdownFile.exists) {
-          info(s"Skipping recovery for all logs in $logDirAbsolutePath since 
clean shutdown file was found")
           // Cache the clean shutdown status and use that for rest of log 
loading workflow. Delete the CleanShutdownFile
           // so that if broker crashes while loading the log, it is considered 
hard shutdown during the next boot up. KAFKA-10471
           Files.deleteIfExists(cleanShutdownFile.toPath)
           hadCleanShutdown = true
-        } else {
-          // log recovery itself is being performed by `Log` class during 
initialization
-          info(s"Attempting recovery for all logs in $logDirAbsolutePath since 
no clean shutdown file was found")
         }
 
         var recoveryPoints = Map[TopicPartition, Long]()
@@ -401,6 +398,17 @@ class LogManager(logDirs: Seq[File],
         numTotalLogs += logsToLoad.length
         numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
 
+        if (logsToLoad.isEmpty) {
+          info(s"No logs found to be loaded in $logDirAbsolutePath")
+        } else if (hadCleanShutdown) {
+          info(s"Skipping recovery of ${logsToLoad.length} logs from 
$logDirAbsolutePath since " +
+            "clean shutdown file was found")
+        } else {
+          info(s"Recovering ${logsToLoad.length} logs from $logDirAbsolutePath 
since no " +
+            "clean shutdown file was found")
+          uncleanLogDirs.append(logDirAbsolutePath)
+        }
+
         val jobsForDir = logsToLoad.map { logDir =>
           val runnable: Runnable = () => {
             debug(s"Loading log $logDir")
@@ -454,7 +462,9 @@ class LogManager(logDirs: Seq[File],
       threadPools.foreach(_.shutdown())
     }
 
-    info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
+    val elapsedMs = time.hiResClockMs() - startMs
+    val printedUncleanLogDirs = if (uncleanLogDirs.isEmpty) "" else s" 
(unclean log dirs = $uncleanLogDirs)"
+    info(s"Loaded $numTotalLogs logs in ${elapsedMs}ms$printedUncleanLogDirs")
   }
 
   private[log] def addLogRecoveryMetrics(numRemainingLogs: 
ConcurrentMap[String, Int],

Reply via email to