showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921173647
########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -363,28 +390,32 @@ class LogManager(logDirs: Seq[File], val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir => logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) - val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length + numRemainingLogs.put(dir.getAbsolutePath, new AtomicInteger(logsToLoad.length)) val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { + debug(s"Loading log $logDir") + var log = None: Option[UnifiedLog] + val logLoadStartMs = time.hiResClockMs() try { - debug(s"Loading log $logDir") - - val logLoadStartMs = time.hiResClockMs() - val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, - defaultConfig, topicConfigOverrides) - val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs - val currentNumLoaded = numLogsLoaded.incrementAndGet() - - info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " + - s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") + log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, + defaultConfig, topicConfigOverrides, numRemainingSegments)) } catch { case e: IOException => handleIOException(logDirAbsolutePath, e) case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] => // KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache // And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here. + } finally { + val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs + val remainingLogs = decNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath) + val currentNumLoaded = logsToLoad.length - remainingLogs + log match { + case Some(loadedLog) => info(s"Completed load of $loadedLog with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " + + s"($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)") + case None => info(s"Error while loading logs in $logDir in ${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)") Review Comment: The log output might not be in order, ex: ... (11/100 completed in /tmp/kafkaLogs) ... (10/100 completed in /tmp/kafkaLogs) ... (12/100 completed in /tmp/kafkaLogs) but I think that's less important. Otherwise, we need a lock in the `finally` block, which I think it'll affect the log recovery performance. That said, since we can make sure the metric result is in correct order, the log output not in order should be tolerant. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org