showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921173647


##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -363,28 +390,32 @@ class LogManager(logDirs: Seq[File],
 
         val logsToLoad = 
Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
           logDir.isDirectory && 
UnifiedLog.parseTopicPartitionName(logDir).topic != 
KafkaRaftServer.MetadataTopic)
-        val numLogsLoaded = new AtomicInteger(0)
         numTotalLogs += logsToLoad.length
+        numRemainingLogs.put(dir.getAbsolutePath, new 
AtomicInteger(logsToLoad.length))
 
         val jobsForDir = logsToLoad.map { logDir =>
           val runnable: Runnable = () => {
+            debug(s"Loading log $logDir")
+            var log = None: Option[UnifiedLog]
+            val logLoadStartMs = time.hiResClockMs()
             try {
-              debug(s"Loading log $logDir")
-
-              val logLoadStartMs = time.hiResClockMs()
-              val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
-                defaultConfig, topicConfigOverrides)
-              val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
-              val currentNumLoaded = numLogsLoaded.incrementAndGet()
-
-              info(s"Completed load of $log with ${log.numberOfSegments} 
segments in ${logLoadDurationMs}ms " +
-                s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
+              log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
+                defaultConfig, topicConfigOverrides, numRemainingSegments))
             } catch {
               case e: IOException =>
                 handleIOException(logDirAbsolutePath, e)
               case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
                 // KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache
                 // And while converting IOException to KafkaStorageException, 
we've already handled the exception. So we can ignore it here.
+            } finally {
+              val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
+              val remainingLogs = decNumRemainingLogs(numRemainingLogs, 
dir.getAbsolutePath)
+              val currentNumLoaded = logsToLoad.length - remainingLogs
+              log match {
+                case Some(loadedLog) => info(s"Completed load of $loadedLog 
with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " +
+                  s"($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")
+                case None => info(s"Error while loading logs in $logDir in 
${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")

Review Comment:
   The log output might not be in order, ex: 
   ... (11/100 completed in /tmp/kafkaLogs)
   ... (10/100 completed in /tmp/kafkaLogs)
   ... (12/100 completed in /tmp/kafkaLogs)
   
   but I think that's less important. Otherwise, we need a lock in the 
`finally` block, which I think it'll affect the log recovery performance. That 
said, since we can make sure the metric result is in correct order, the log 
output not in order should be tolerant. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to